clvmd.c revision 1.2 1 1.2 dholland /* $NetBSD: clvmd.c,v 1.2 2015/11/09 00:53:57 dholland Exp $ */
2 1.1 haad
3 1.1 haad /*
4 1.1 haad * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
5 1.2 dholland * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
6 1.1 haad *
7 1.1 haad * This file is part of LVM2.
8 1.1 haad *
9 1.1 haad * This copyrighted material is made available to anyone wishing to use,
10 1.1 haad * modify, copy, or redistribute it subject to the terms and conditions
11 1.1 haad * of the GNU General Public License v.2.
12 1.1 haad *
13 1.1 haad * You should have received a copy of the GNU General Public License
14 1.1 haad * along with this program; if not, write to the Free Software Foundation,
15 1.1 haad * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 1.1 haad */
17 1.1 haad
18 1.1 haad /*
19 1.1 haad * CLVMD: Cluster LVM daemon
20 1.1 haad */
21 1.1 haad
22 1.1 haad #define _GNU_SOURCE
23 1.1 haad #define _FILE_OFFSET_BITS 64
24 1.1 haad
25 1.1 haad #include <configure.h>
26 1.1 haad #include <libdevmapper.h>
27 1.1 haad
28 1.1 haad #include <pthread.h>
29 1.1 haad #include <sys/types.h>
30 1.1 haad #include <sys/stat.h>
31 1.1 haad #include <sys/socket.h>
32 1.1 haad #include <sys/uio.h>
33 1.1 haad #include <sys/un.h>
34 1.1 haad #include <sys/time.h>
35 1.1 haad #include <sys/ioctl.h>
36 1.1 haad #include <sys/utsname.h>
37 1.1 haad #include <netinet/in.h>
38 1.1 haad #include <stdio.h>
39 1.1 haad #include <stdlib.h>
40 1.1 haad #include <stddef.h>
41 1.1 haad #include <stdarg.h>
42 1.1 haad #include <signal.h>
43 1.1 haad #include <unistd.h>
44 1.1 haad #include <fcntl.h>
45 1.1 haad #include <getopt.h>
46 1.1 haad #include <syslog.h>
47 1.1 haad #include <errno.h>
48 1.1 haad #include <limits.h>
49 1.2 dholland #ifdef HAVE_COROSYNC_CONFDB_H
50 1.2 dholland #include <corosync/confdb.h>
51 1.2 dholland #endif
52 1.1 haad
53 1.1 haad #include "clvmd-comms.h"
54 1.1 haad #include "lvm-functions.h"
55 1.1 haad #include "clvm.h"
56 1.2 dholland #include "lvm-version.h"
57 1.1 haad #include "clvmd.h"
58 1.1 haad #include "refresh_clvmd.h"
59 1.1 haad #include "lvm-logging.h"
60 1.1 haad
61 1.1 haad #ifndef TRUE
62 1.1 haad #define TRUE 1
63 1.1 haad #endif
64 1.1 haad #ifndef FALSE
65 1.1 haad #define FALSE 0
66 1.1 haad #endif
67 1.1 haad
68 1.1 haad #define MAX_RETRIES 4
69 1.1 haad
70 1.1 haad #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
71 1.1 haad
72 1.1 haad /* Head of the fd list. Also contains
73 1.1 haad the cluster_socket details */
74 1.1 haad static struct local_client local_client_head;
75 1.1 haad
76 1.1 haad static unsigned short global_xid = 0; /* Last transaction ID issued */
77 1.1 haad
78 1.1 haad struct cluster_ops *clops = NULL;
79 1.1 haad
80 1.1 haad static char our_csid[MAX_CSID_LEN];
81 1.1 haad static unsigned max_csid_len;
82 1.1 haad static unsigned max_cluster_message;
83 1.1 haad static unsigned max_cluster_member_name_len;
84 1.1 haad
85 1.1 haad /* Structure of items on the LVM thread list */
86 1.1 haad struct lvm_thread_cmd {
87 1.1 haad struct dm_list list;
88 1.1 haad
89 1.1 haad struct local_client *client;
90 1.1 haad struct clvm_header *msg;
91 1.1 haad char csid[MAX_CSID_LEN];
92 1.1 haad int remote; /* Flag */
93 1.1 haad int msglen;
94 1.1 haad unsigned short xid;
95 1.1 haad };
96 1.1 haad
97 1.1 haad debug_t debug;
98 1.1 haad static pthread_t lvm_thread;
99 1.1 haad static pthread_mutex_t lvm_thread_mutex;
100 1.1 haad static pthread_cond_t lvm_thread_cond;
101 1.1 haad static pthread_mutex_t lvm_start_mutex;
102 1.1 haad static struct dm_list lvm_cmd_head;
103 1.1 haad static volatile sig_atomic_t quit = 0;
104 1.1 haad static volatile sig_atomic_t reread_config = 0;
105 1.1 haad static int child_pipe[2];
106 1.1 haad
107 1.1 haad /* Reasons the daemon failed initialisation */
108 1.1 haad #define DFAIL_INIT 1
109 1.1 haad #define DFAIL_LOCAL_SOCK 2
110 1.1 haad #define DFAIL_CLUSTER_IF 3
111 1.1 haad #define DFAIL_MALLOC 4
112 1.1 haad #define DFAIL_TIMEOUT 5
113 1.1 haad #define SUCCESS 0
114 1.1 haad
115 1.2 dholland typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC} if_type_t;
116 1.2 dholland
117 1.2 dholland typedef void *(lvm_pthread_fn_t)(void*);
118 1.2 dholland
119 1.1 haad /* Prototypes for code further down */
120 1.1 haad static void sigusr2_handler(int sig);
121 1.1 haad static void sighup_handler(int sig);
122 1.1 haad static void sigterm_handler(int sig);
123 1.1 haad static void send_local_reply(struct local_client *client, int status,
124 1.1 haad int clientid);
125 1.1 haad static void free_reply(struct local_client *client);
126 1.1 haad static void send_version_message(void);
127 1.1 haad static void *pre_and_post_thread(void *arg);
128 1.1 haad static int send_message(void *buf, int msglen, const char *csid, int fd,
129 1.1 haad const char *errtext);
130 1.1 haad static int read_from_local_sock(struct local_client *thisfd);
131 1.1 haad static int process_local_command(struct clvm_header *msg, int msglen,
132 1.1 haad struct local_client *client,
133 1.1 haad unsigned short xid);
134 1.1 haad static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
135 1.1 haad const char *csid);
136 1.1 haad static int process_reply(const struct clvm_header *msg, int msglen,
137 1.1 haad const char *csid);
138 1.1 haad static int open_local_sock(void);
139 1.1 haad static int check_local_clvmd(void);
140 1.1 haad static struct local_client *find_client(int clientid);
141 1.1 haad static void main_loop(int local_sock, int cmd_timeout);
142 1.1 haad static void be_daemon(int start_timeout);
143 1.1 haad static int check_all_clvmds_running(struct local_client *client);
144 1.1 haad static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
145 1.1 haad int len, const char *csid,
146 1.1 haad struct local_client **new_client);
147 1.2 dholland static void lvm_thread_fn(void *) __attribute__ ((noreturn));
148 1.1 haad static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
149 1.1 haad int msglen, const char *csid);
150 1.1 haad static int distribute_command(struct local_client *thisfd);
151 1.1 haad static void hton_clvm(struct clvm_header *hdr);
152 1.1 haad static void ntoh_clvm(struct clvm_header *hdr);
153 1.1 haad static void add_reply_to_list(struct local_client *client, int status,
154 1.1 haad const char *csid, const char *buf, int len);
155 1.2 dholland static if_type_t parse_cluster_interface(char *ifname);
156 1.2 dholland static if_type_t get_cluster_type(void);
157 1.1 haad
158 1.1 haad static void usage(char *prog, FILE *file)
159 1.1 haad {
160 1.1 haad fprintf(file, "Usage:\n");
161 1.1 haad fprintf(file, "%s [Vhd]\n", prog);
162 1.1 haad fprintf(file, "\n");
163 1.1 haad fprintf(file, " -V Show version of clvmd\n");
164 1.1 haad fprintf(file, " -h Show this help information\n");
165 1.1 haad fprintf(file, " -d Set debug level\n");
166 1.1 haad fprintf(file, " If starting clvmd then don't fork, run in the foreground\n");
167 1.1 haad fprintf(file, " -R Tell all running clvmds in the cluster to reload their device cache\n");
168 1.1 haad fprintf(file, " -C Sets debug level (from -d) on all clvmd instances clusterwide\n");
169 1.1 haad fprintf(file, " -t<secs> Command timeout (default 60 seconds)\n");
170 1.1 haad fprintf(file, " -T<secs> Startup timeout (default none)\n");
171 1.2 dholland fprintf(file, " -I<cmgr> Cluster manager (default: auto)\n");
172 1.2 dholland fprintf(file, " Available cluster managers: ");
173 1.2 dholland #ifdef USE_COROSYNC
174 1.2 dholland fprintf(file, "corosync ");
175 1.2 dholland #endif
176 1.2 dholland #ifdef USE_CMAN
177 1.2 dholland fprintf(file, "cman ");
178 1.2 dholland #endif
179 1.2 dholland #ifdef USE_OPENAIS
180 1.2 dholland fprintf(file, "openais ");
181 1.2 dholland #endif
182 1.2 dholland #ifdef USE_GULM
183 1.2 dholland fprintf(file, "gulm ");
184 1.2 dholland #endif
185 1.1 haad fprintf(file, "\n");
186 1.1 haad }
187 1.1 haad
188 1.1 haad /* Called to signal the parent how well we got on during initialisation */
189 1.1 haad static void child_init_signal(int status)
190 1.1 haad {
191 1.1 haad if (child_pipe[1]) {
192 1.1 haad write(child_pipe[1], &status, sizeof(status));
193 1.1 haad close(child_pipe[1]);
194 1.1 haad }
195 1.1 haad if (status)
196 1.1 haad exit(status);
197 1.1 haad }
198 1.1 haad
199 1.1 haad
200 1.1 haad void debuglog(const char *fmt, ...)
201 1.1 haad {
202 1.1 haad time_t P;
203 1.1 haad va_list ap;
204 1.1 haad static int syslog_init = 0;
205 1.1 haad
206 1.1 haad if (debug == DEBUG_STDERR) {
207 1.1 haad va_start(ap,fmt);
208 1.1 haad time(&P);
209 1.1 haad fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
210 1.1 haad vfprintf(stderr, fmt, ap);
211 1.1 haad va_end(ap);
212 1.1 haad }
213 1.1 haad if (debug == DEBUG_SYSLOG) {
214 1.1 haad if (!syslog_init) {
215 1.1 haad openlog("clvmd", LOG_PID, LOG_DAEMON);
216 1.1 haad syslog_init = 1;
217 1.1 haad }
218 1.1 haad
219 1.1 haad va_start(ap,fmt);
220 1.1 haad vsyslog(LOG_DEBUG, fmt, ap);
221 1.1 haad va_end(ap);
222 1.1 haad }
223 1.1 haad }
224 1.1 haad
225 1.1 haad static const char *decode_cmd(unsigned char cmdl)
226 1.1 haad {
227 1.1 haad static char buf[128];
228 1.1 haad const char *command;
229 1.1 haad
230 1.1 haad switch (cmdl) {
231 1.2 dholland case CLVMD_CMD_TEST:
232 1.2 dholland command = "TEST";
233 1.1 haad break;
234 1.2 dholland case CLVMD_CMD_LOCK_VG:
235 1.2 dholland command = "LOCK_VG";
236 1.1 haad break;
237 1.2 dholland case CLVMD_CMD_LOCK_LV:
238 1.2 dholland command = "LOCK_LV";
239 1.1 haad break;
240 1.2 dholland case CLVMD_CMD_REFRESH:
241 1.2 dholland command = "REFRESH";
242 1.1 haad break;
243 1.2 dholland case CLVMD_CMD_SET_DEBUG:
244 1.2 dholland command = "SET_DEBUG";
245 1.1 haad break;
246 1.2 dholland case CLVMD_CMD_GET_CLUSTERNAME:
247 1.1 haad command = "GET_CLUSTERNAME";
248 1.1 haad break;
249 1.2 dholland case CLVMD_CMD_VG_BACKUP:
250 1.2 dholland command = "VG_BACKUP";
251 1.2 dholland break;
252 1.2 dholland case CLVMD_CMD_REPLY:
253 1.2 dholland command = "REPLY";
254 1.1 haad break;
255 1.2 dholland case CLVMD_CMD_VERSION:
256 1.2 dholland command = "VERSION";
257 1.1 haad break;
258 1.2 dholland case CLVMD_CMD_GOAWAY:
259 1.2 dholland command = "GOAWAY";
260 1.1 haad break;
261 1.2 dholland case CLVMD_CMD_LOCK:
262 1.2 dholland command = "LOCK";
263 1.1 haad break;
264 1.2 dholland case CLVMD_CMD_UNLOCK:
265 1.2 dholland command = "UNLOCK";
266 1.1 haad break;
267 1.2 dholland case CLVMD_CMD_LOCK_QUERY:
268 1.2 dholland command = "LOCK_QUERY";
269 1.1 haad break;
270 1.2 dholland default:
271 1.2 dholland command = "unknown";
272 1.1 haad break;
273 1.1 haad }
274 1.1 haad
275 1.1 haad sprintf(buf, "%s (0x%x)", command, cmdl);
276 1.1 haad
277 1.1 haad return buf;
278 1.1 haad }
279 1.1 haad
280 1.1 haad int main(int argc, char *argv[])
281 1.1 haad {
282 1.1 haad int local_sock;
283 1.1 haad struct local_client *newfd;
284 1.1 haad struct utsname nodeinfo;
285 1.1 haad signed char opt;
286 1.1 haad int cmd_timeout = DEFAULT_CMD_TIMEOUT;
287 1.1 haad int start_timeout = 0;
288 1.2 dholland if_type_t cluster_iface = IF_AUTO;
289 1.1 haad sigset_t ss;
290 1.1 haad int using_gulm = 0;
291 1.1 haad int debug_opt = 0;
292 1.1 haad int clusterwide_opt = 0;
293 1.1 haad
294 1.1 haad /* Deal with command-line arguments */
295 1.1 haad opterr = 0;
296 1.1 haad optind = 0;
297 1.2 dholland while ((opt = getopt(argc, argv, "?vVhd::t:RT:CI:")) != EOF) {
298 1.1 haad switch (opt) {
299 1.1 haad case 'h':
300 1.1 haad usage(argv[0], stdout);
301 1.1 haad exit(0);
302 1.1 haad
303 1.1 haad case '?':
304 1.1 haad usage(argv[0], stderr);
305 1.1 haad exit(0);
306 1.1 haad
307 1.1 haad case 'R':
308 1.2 dholland return refresh_clvmd()==1?0:1;
309 1.1 haad
310 1.1 haad case 'C':
311 1.1 haad clusterwide_opt = 1;
312 1.1 haad break;
313 1.1 haad
314 1.1 haad case 'd':
315 1.1 haad debug_opt = 1;
316 1.1 haad if (optarg)
317 1.1 haad debug = atoi(optarg);
318 1.1 haad else
319 1.1 haad debug = DEBUG_STDERR;
320 1.1 haad break;
321 1.1 haad
322 1.1 haad case 't':
323 1.1 haad cmd_timeout = atoi(optarg);
324 1.1 haad if (!cmd_timeout) {
325 1.1 haad fprintf(stderr, "command timeout is invalid\n");
326 1.1 haad usage(argv[0], stderr);
327 1.1 haad exit(1);
328 1.1 haad }
329 1.1 haad break;
330 1.2 dholland case 'I':
331 1.2 dholland cluster_iface = parse_cluster_interface(optarg);
332 1.2 dholland break;
333 1.1 haad case 'T':
334 1.1 haad start_timeout = atoi(optarg);
335 1.1 haad if (start_timeout <= 0) {
336 1.1 haad fprintf(stderr, "startup timeout is invalid\n");
337 1.1 haad usage(argv[0], stderr);
338 1.1 haad exit(1);
339 1.1 haad }
340 1.1 haad break;
341 1.1 haad
342 1.1 haad case 'V':
343 1.1 haad printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
344 1.1 haad printf("Protocol version: %d.%d.%d\n",
345 1.1 haad CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
346 1.1 haad CLVMD_PATCH_VERSION);
347 1.1 haad exit(1);
348 1.1 haad break;
349 1.1 haad
350 1.1 haad }
351 1.1 haad }
352 1.1 haad
353 1.1 haad /* Setting debug options on an existing clvmd */
354 1.1 haad if (debug_opt && !check_local_clvmd()) {
355 1.1 haad
356 1.1 haad /* Sending to stderr makes no sense for a detached daemon */
357 1.1 haad if (debug == DEBUG_STDERR)
358 1.1 haad debug = DEBUG_SYSLOG;
359 1.2 dholland return debug_clvmd(debug, clusterwide_opt)==1?0:1;
360 1.1 haad }
361 1.1 haad
362 1.1 haad /* Fork into the background (unless requested not to) */
363 1.1 haad if (debug != DEBUG_STDERR) {
364 1.1 haad be_daemon(start_timeout);
365 1.1 haad }
366 1.1 haad
367 1.1 haad DEBUGLOG("CLVMD started\n");
368 1.1 haad
369 1.1 haad /* Open the Unix socket we listen for commands on.
370 1.1 haad We do this before opening the cluster socket so that
371 1.1 haad potential clients will block rather than error if we are running
372 1.1 haad but the cluster is not ready yet */
373 1.1 haad local_sock = open_local_sock();
374 1.1 haad if (local_sock < 0)
375 1.1 haad child_init_signal(DFAIL_LOCAL_SOCK);
376 1.1 haad
377 1.1 haad /* Set up signal handlers, USR1 is for cluster change notifications (in cman)
378 1.1 haad USR2 causes child threads to exit.
379 1.1 haad HUP causes gulm version to re-read nodes list from CCS.
380 1.1 haad PIPE should be ignored */
381 1.1 haad signal(SIGUSR2, sigusr2_handler);
382 1.1 haad signal(SIGHUP, sighup_handler);
383 1.1 haad signal(SIGPIPE, SIG_IGN);
384 1.1 haad
385 1.2 dholland /* Block SIGUSR2/SIGINT/SIGTERM in process */
386 1.1 haad sigemptyset(&ss);
387 1.1 haad sigaddset(&ss, SIGUSR2);
388 1.2 dholland sigaddset(&ss, SIGINT);
389 1.2 dholland sigaddset(&ss, SIGTERM);
390 1.1 haad sigprocmask(SIG_BLOCK, &ss, NULL);
391 1.1 haad
392 1.1 haad /* Initialise the LVM thread variables */
393 1.1 haad dm_list_init(&lvm_cmd_head);
394 1.1 haad pthread_mutex_init(&lvm_thread_mutex, NULL);
395 1.1 haad pthread_cond_init(&lvm_thread_cond, NULL);
396 1.1 haad pthread_mutex_init(&lvm_start_mutex, NULL);
397 1.1 haad init_lvhash();
398 1.1 haad
399 1.1 haad /* Start the cluster interface */
400 1.2 dholland if (cluster_iface == IF_AUTO)
401 1.2 dholland cluster_iface = get_cluster_type();
402 1.2 dholland
403 1.1 haad #ifdef USE_CMAN
404 1.2 dholland if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) {
405 1.1 haad max_csid_len = CMAN_MAX_CSID_LEN;
406 1.1 haad max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
407 1.1 haad max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
408 1.1 haad syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
409 1.1 haad }
410 1.1 haad #endif
411 1.1 haad #ifdef USE_GULM
412 1.1 haad if (!clops)
413 1.2 dholland if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) {
414 1.1 haad max_csid_len = GULM_MAX_CSID_LEN;
415 1.1 haad max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
416 1.1 haad max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
417 1.1 haad using_gulm = 1;
418 1.1 haad syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
419 1.1 haad }
420 1.1 haad #endif
421 1.2 dholland #ifdef USE_COROSYNC
422 1.2 dholland if (!clops)
423 1.2 dholland if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) {
424 1.2 dholland max_csid_len = COROSYNC_CSID_LEN;
425 1.2 dholland max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
426 1.2 dholland max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
427 1.2 dholland syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
428 1.2 dholland }
429 1.2 dholland #endif
430 1.1 haad #ifdef USE_OPENAIS
431 1.1 haad if (!clops)
432 1.2 dholland if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) {
433 1.1 haad max_csid_len = OPENAIS_CSID_LEN;
434 1.1 haad max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
435 1.1 haad max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
436 1.1 haad syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
437 1.1 haad }
438 1.1 haad #endif
439 1.1 haad
440 1.1 haad if (!clops) {
441 1.1 haad DEBUGLOG("Can't initialise cluster interface\n");
442 1.1 haad log_error("Can't initialise cluster interface\n");
443 1.1 haad child_init_signal(DFAIL_CLUSTER_IF);
444 1.1 haad }
445 1.1 haad DEBUGLOG("Cluster ready, doing some more initialisation\n");
446 1.1 haad
447 1.1 haad /* Save our CSID */
448 1.1 haad uname(&nodeinfo);
449 1.1 haad clops->get_our_csid(our_csid);
450 1.1 haad
451 1.1 haad /* Initialise the FD list head */
452 1.1 haad local_client_head.fd = clops->get_main_cluster_fd();
453 1.1 haad local_client_head.type = CLUSTER_MAIN_SOCK;
454 1.1 haad local_client_head.callback = clops->cluster_fd_callback;
455 1.1 haad
456 1.1 haad /* Add the local socket to the list */
457 1.1 haad newfd = malloc(sizeof(struct local_client));
458 1.1 haad if (!newfd)
459 1.1 haad child_init_signal(DFAIL_MALLOC);
460 1.1 haad
461 1.1 haad newfd->fd = local_sock;
462 1.1 haad newfd->removeme = 0;
463 1.1 haad newfd->type = LOCAL_RENDEZVOUS;
464 1.1 haad newfd->callback = local_rendezvous_callback;
465 1.1 haad newfd->next = local_client_head.next;
466 1.1 haad local_client_head.next = newfd;
467 1.1 haad
468 1.1 haad /* This needs to be started after cluster initialisation
469 1.1 haad as it may need to take out locks */
470 1.1 haad DEBUGLOG("starting LVM thread\n");
471 1.2 dholland
472 1.2 dholland /* Don't let anyone else to do work until we are started */
473 1.2 dholland pthread_mutex_lock(&lvm_start_mutex);
474 1.2 dholland pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn,
475 1.1 haad (void *)(long)using_gulm);
476 1.1 haad
477 1.1 haad /* Tell the rest of the cluster our version number */
478 1.1 haad /* CMAN can do this immediately, gulm needs to wait until
479 1.1 haad the core initialisation has finished and the node list
480 1.1 haad has been gathered */
481 1.1 haad if (clops->cluster_init_completed)
482 1.1 haad clops->cluster_init_completed();
483 1.1 haad
484 1.1 haad DEBUGLOG("clvmd ready for work\n");
485 1.1 haad child_init_signal(SUCCESS);
486 1.1 haad
487 1.1 haad /* Try to shutdown neatly */
488 1.1 haad signal(SIGTERM, sigterm_handler);
489 1.1 haad signal(SIGINT, sigterm_handler);
490 1.1 haad
491 1.1 haad /* Do some work */
492 1.1 haad main_loop(local_sock, cmd_timeout);
493 1.1 haad
494 1.2 dholland destroy_lvm();
495 1.2 dholland
496 1.1 haad return 0;
497 1.1 haad }
498 1.1 haad
499 1.1 haad /* Called when the GuLM cluster layer has completed initialisation.
500 1.1 haad We send the version message */
501 1.1 haad void clvmd_cluster_init_completed()
502 1.1 haad {
503 1.1 haad send_version_message();
504 1.1 haad }
505 1.1 haad
506 1.1 haad /* Data on a connected socket */
507 1.1 haad static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
508 1.1 haad const char *csid,
509 1.1 haad struct local_client **new_client)
510 1.1 haad {
511 1.1 haad *new_client = NULL;
512 1.1 haad return read_from_local_sock(thisfd);
513 1.1 haad }
514 1.1 haad
515 1.1 haad /* Data on a connected socket */
516 1.1 haad static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
517 1.1 haad int len, const char *csid,
518 1.1 haad struct local_client **new_client)
519 1.1 haad {
520 1.1 haad /* Someone connected to our local socket, accept it. */
521 1.1 haad
522 1.1 haad struct sockaddr_un socka;
523 1.1 haad struct local_client *newfd;
524 1.1 haad socklen_t sl = sizeof(socka);
525 1.1 haad int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
526 1.1 haad
527 1.1 haad if (client_fd == -1 && errno == EINTR)
528 1.1 haad return 1;
529 1.1 haad
530 1.1 haad if (client_fd >= 0) {
531 1.1 haad newfd = malloc(sizeof(struct local_client));
532 1.1 haad if (!newfd) {
533 1.1 haad close(client_fd);
534 1.1 haad return 1;
535 1.1 haad }
536 1.1 haad newfd->fd = client_fd;
537 1.1 haad newfd->type = LOCAL_SOCK;
538 1.1 haad newfd->xid = 0;
539 1.1 haad newfd->removeme = 0;
540 1.1 haad newfd->callback = local_sock_callback;
541 1.1 haad newfd->bits.localsock.replies = NULL;
542 1.1 haad newfd->bits.localsock.expected_replies = 0;
543 1.1 haad newfd->bits.localsock.cmd = NULL;
544 1.1 haad newfd->bits.localsock.in_progress = FALSE;
545 1.1 haad newfd->bits.localsock.sent_out = FALSE;
546 1.1 haad newfd->bits.localsock.threadid = 0;
547 1.1 haad newfd->bits.localsock.finished = 0;
548 1.1 haad newfd->bits.localsock.pipe_client = NULL;
549 1.1 haad newfd->bits.localsock.private = NULL;
550 1.1 haad newfd->bits.localsock.all_success = 1;
551 1.1 haad DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
552 1.1 haad *new_client = newfd;
553 1.1 haad }
554 1.1 haad return 1;
555 1.1 haad }
556 1.1 haad
557 1.1 haad static int local_pipe_callback(struct local_client *thisfd, char *buf,
558 1.1 haad int maxlen, const char *csid,
559 1.1 haad struct local_client **new_client)
560 1.1 haad {
561 1.1 haad int len;
562 1.1 haad char buffer[PIPE_BUF];
563 1.1 haad struct local_client *sock_client = thisfd->bits.pipe.client;
564 1.1 haad int status = -1; /* in error by default */
565 1.1 haad
566 1.1 haad len = read(thisfd->fd, buffer, sizeof(int));
567 1.1 haad if (len == -1 && errno == EINTR)
568 1.1 haad return 1;
569 1.1 haad
570 1.1 haad if (len == sizeof(int)) {
571 1.1 haad memcpy(&status, buffer, sizeof(int));
572 1.1 haad }
573 1.1 haad
574 1.1 haad DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
575 1.1 haad thisfd->fd, len, status);
576 1.1 haad
577 1.1 haad /* EOF on pipe or an error, close it */
578 1.1 haad if (len <= 0) {
579 1.1 haad int jstat;
580 1.1 haad void *ret = &status;
581 1.1 haad close(thisfd->fd);
582 1.1 haad
583 1.1 haad /* Clear out the cross-link */
584 1.1 haad if (thisfd->bits.pipe.client != NULL)
585 1.1 haad thisfd->bits.pipe.client->bits.localsock.pipe_client =
586 1.1 haad NULL;
587 1.1 haad
588 1.1 haad /* Reap child thread */
589 1.1 haad if (thisfd->bits.pipe.threadid) {
590 1.1 haad jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
591 1.1 haad thisfd->bits.pipe.threadid = 0;
592 1.1 haad if (thisfd->bits.pipe.client != NULL)
593 1.1 haad thisfd->bits.pipe.client->bits.localsock.
594 1.1 haad threadid = 0;
595 1.1 haad }
596 1.1 haad return -1;
597 1.1 haad } else {
598 1.1 haad DEBUGLOG("background routine status was %d, sock_client=%p\n",
599 1.1 haad status, sock_client);
600 1.1 haad /* But has the client gone away ?? */
601 1.1 haad if (sock_client == NULL) {
602 1.1 haad DEBUGLOG
603 1.1 haad ("Got PIPE response for dead client, ignoring it\n");
604 1.1 haad } else {
605 1.1 haad /* If error then just return that code */
606 1.1 haad if (status)
607 1.1 haad send_local_reply(sock_client, status,
608 1.1 haad sock_client->fd);
609 1.1 haad else {
610 1.1 haad if (sock_client->bits.localsock.state ==
611 1.1 haad POST_COMMAND) {
612 1.1 haad send_local_reply(sock_client, 0,
613 1.1 haad sock_client->fd);
614 1.1 haad } else // PRE_COMMAND finished.
615 1.1 haad {
616 1.1 haad if (
617 1.1 haad (status =
618 1.1 haad distribute_command(sock_client)) !=
619 1.1 haad 0) send_local_reply(sock_client,
620 1.1 haad EFBIG,
621 1.1 haad sock_client->
622 1.1 haad fd);
623 1.1 haad }
624 1.1 haad }
625 1.1 haad }
626 1.1 haad }
627 1.1 haad return len;
628 1.1 haad }
629 1.1 haad
630 1.1 haad /* If a noed is up, look for it in the reply array, if it's not there then
631 1.1 haad add one with "ETIMEDOUT".
632 1.1 haad NOTE: This won't race with real replies because they happen in the same thread.
633 1.1 haad */
634 1.1 haad static void timedout_callback(struct local_client *client, const char *csid,
635 1.1 haad int node_up)
636 1.1 haad {
637 1.1 haad if (node_up) {
638 1.1 haad struct node_reply *reply;
639 1.1 haad char nodename[max_cluster_member_name_len];
640 1.1 haad
641 1.1 haad clops->name_from_csid(csid, nodename);
642 1.1 haad DEBUGLOG("Checking for a reply from %s\n", nodename);
643 1.1 haad pthread_mutex_lock(&client->bits.localsock.reply_mutex);
644 1.1 haad
645 1.1 haad reply = client->bits.localsock.replies;
646 1.1 haad while (reply && strcmp(reply->node, nodename) != 0) {
647 1.1 haad reply = reply->next;
648 1.1 haad }
649 1.1 haad
650 1.1 haad pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
651 1.1 haad
652 1.1 haad if (!reply) {
653 1.1 haad DEBUGLOG("Node %s timed-out\n", nodename);
654 1.1 haad add_reply_to_list(client, ETIMEDOUT, csid,
655 1.1 haad "Command timed out", 18);
656 1.1 haad }
657 1.1 haad }
658 1.1 haad }
659 1.1 haad
660 1.1 haad /* Called when the request has timed out on at least one node. We fill in
661 1.1 haad the remaining node entries with ETIMEDOUT and return.
662 1.1 haad
663 1.1 haad By the time we get here the node that caused
664 1.1 haad the timeout could have gone down, in which case we will never get the expected
665 1.1 haad number of replies that triggers the post command so we need to do it here
666 1.1 haad */
667 1.1 haad static void request_timed_out(struct local_client *client)
668 1.1 haad {
669 1.1 haad DEBUGLOG("Request timed-out. padding\n");
670 1.1 haad clops->cluster_do_node_callback(client, timedout_callback);
671 1.1 haad
672 1.1 haad if (client->bits.localsock.num_replies !=
673 1.1 haad client->bits.localsock.expected_replies) {
674 1.1 haad /* Post-process the command */
675 1.1 haad if (client->bits.localsock.threadid) {
676 1.1 haad pthread_mutex_lock(&client->bits.localsock.mutex);
677 1.1 haad client->bits.localsock.state = POST_COMMAND;
678 1.1 haad pthread_cond_signal(&client->bits.localsock.cond);
679 1.1 haad pthread_mutex_unlock(&client->bits.localsock.mutex);
680 1.1 haad }
681 1.1 haad }
682 1.1 haad }
683 1.1 haad
684 1.1 haad /* This is where the real work happens */
685 1.1 haad static void main_loop(int local_sock, int cmd_timeout)
686 1.1 haad {
687 1.1 haad DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
688 1.1 haad
689 1.2 dholland sigset_t ss;
690 1.2 dholland sigemptyset(&ss);
691 1.2 dholland sigaddset(&ss, SIGINT);
692 1.2 dholland sigaddset(&ss, SIGTERM);
693 1.2 dholland pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
694 1.1 haad /* Main loop */
695 1.1 haad while (!quit) {
696 1.1 haad fd_set in;
697 1.1 haad int select_status;
698 1.1 haad struct local_client *thisfd;
699 1.1 haad struct timeval tv = { cmd_timeout, 0 };
700 1.1 haad int quorate = clops->is_quorate();
701 1.1 haad
702 1.1 haad /* Wait on the cluster FD and all local sockets/pipes */
703 1.1 haad local_client_head.fd = clops->get_main_cluster_fd();
704 1.1 haad FD_ZERO(&in);
705 1.1 haad for (thisfd = &local_client_head; thisfd != NULL;
706 1.1 haad thisfd = thisfd->next) {
707 1.1 haad
708 1.1 haad if (thisfd->removeme)
709 1.1 haad continue;
710 1.1 haad
711 1.1 haad /* if the cluster is not quorate then don't listen for new requests */
712 1.1 haad if ((thisfd->type != LOCAL_RENDEZVOUS &&
713 1.1 haad thisfd->type != LOCAL_SOCK) || quorate)
714 1.1 haad FD_SET(thisfd->fd, &in);
715 1.1 haad }
716 1.1 haad
717 1.1 haad select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
718 1.1 haad
719 1.1 haad if (reread_config) {
720 1.1 haad int saved_errno = errno;
721 1.1 haad
722 1.1 haad reread_config = 0;
723 1.1 haad if (clops->reread_config)
724 1.1 haad clops->reread_config();
725 1.1 haad errno = saved_errno;
726 1.1 haad }
727 1.1 haad
728 1.1 haad if (select_status > 0) {
729 1.1 haad struct local_client *lastfd = NULL;
730 1.1 haad char csid[MAX_CSID_LEN];
731 1.1 haad char buf[max_cluster_message];
732 1.1 haad
733 1.1 haad for (thisfd = &local_client_head; thisfd != NULL;
734 1.1 haad thisfd = thisfd->next) {
735 1.1 haad
736 1.1 haad if (thisfd->removeme) {
737 1.1 haad struct local_client *free_fd;
738 1.1 haad lastfd->next = thisfd->next;
739 1.1 haad free_fd = thisfd;
740 1.1 haad thisfd = lastfd;
741 1.1 haad
742 1.1 haad DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
743 1.1 haad
744 1.1 haad /* Queue cleanup, this also frees the client struct */
745 1.1 haad add_to_lvmqueue(free_fd, NULL, 0, NULL);
746 1.1 haad break;
747 1.1 haad }
748 1.1 haad
749 1.1 haad if (FD_ISSET(thisfd->fd, &in)) {
750 1.1 haad struct local_client *newfd = NULL;
751 1.1 haad int ret;
752 1.1 haad
753 1.1 haad /* Do callback */
754 1.1 haad ret =
755 1.1 haad thisfd->callback(thisfd, buf,
756 1.1 haad sizeof(buf), csid,
757 1.1 haad &newfd);
758 1.1 haad /* Ignore EAGAIN */
759 1.1 haad if (ret < 0 && (errno == EAGAIN ||
760 1.1 haad errno == EINTR)) continue;
761 1.1 haad
762 1.1 haad /* Got error or EOF: Remove it from the list safely */
763 1.1 haad if (ret <= 0) {
764 1.1 haad struct local_client *free_fd;
765 1.1 haad int type = thisfd->type;
766 1.1 haad
767 1.1 haad /* If the cluster socket shuts down, so do we */
768 1.1 haad if (type == CLUSTER_MAIN_SOCK ||
769 1.1 haad type == CLUSTER_INTERNAL)
770 1.1 haad goto closedown;
771 1.1 haad
772 1.1 haad DEBUGLOG("ret == %d, errno = %d. removing client\n",
773 1.1 haad ret, errno);
774 1.1 haad lastfd->next = thisfd->next;
775 1.1 haad free_fd = thisfd;
776 1.1 haad thisfd = lastfd;
777 1.1 haad close(free_fd->fd);
778 1.1 haad
779 1.1 haad /* Queue cleanup, this also frees the client struct */
780 1.1 haad add_to_lvmqueue(free_fd, NULL, 0, NULL);
781 1.1 haad break;
782 1.1 haad }
783 1.1 haad
784 1.1 haad /* New client...simply add it to the list */
785 1.1 haad if (newfd) {
786 1.1 haad newfd->next = thisfd->next;
787 1.1 haad thisfd->next = newfd;
788 1.1 haad break;
789 1.1 haad }
790 1.1 haad }
791 1.1 haad lastfd = thisfd;
792 1.1 haad }
793 1.1 haad }
794 1.1 haad
795 1.1 haad /* Select timed out. Check for clients that have been waiting too long for a response */
796 1.1 haad if (select_status == 0) {
797 1.1 haad time_t the_time = time(NULL);
798 1.1 haad
799 1.1 haad for (thisfd = &local_client_head; thisfd != NULL;
800 1.1 haad thisfd = thisfd->next) {
801 1.1 haad if (thisfd->type == LOCAL_SOCK
802 1.1 haad && thisfd->bits.localsock.sent_out
803 1.1 haad && thisfd->bits.localsock.sent_time +
804 1.1 haad cmd_timeout < the_time
805 1.1 haad && thisfd->bits.localsock.
806 1.1 haad expected_replies !=
807 1.1 haad thisfd->bits.localsock.num_replies) {
808 1.1 haad /* Send timed out message + replies we already have */
809 1.1 haad DEBUGLOG
810 1.1 haad ("Request timed-out (send: %ld, now: %ld)\n",
811 1.1 haad thisfd->bits.localsock.sent_time,
812 1.1 haad the_time);
813 1.1 haad
814 1.1 haad thisfd->bits.localsock.all_success = 0;
815 1.1 haad
816 1.1 haad request_timed_out(thisfd);
817 1.1 haad }
818 1.1 haad }
819 1.1 haad }
820 1.1 haad if (select_status < 0) {
821 1.1 haad if (errno == EINTR)
822 1.1 haad continue;
823 1.1 haad
824 1.1 haad #ifdef DEBUG
825 1.1 haad perror("select error");
826 1.1 haad exit(-1);
827 1.1 haad #endif
828 1.1 haad }
829 1.1 haad }
830 1.1 haad
831 1.1 haad closedown:
832 1.1 haad clops->cluster_closedown();
833 1.1 haad close(local_sock);
834 1.1 haad }
835 1.1 haad
836 1.1 haad static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
837 1.1 haad {
838 1.1 haad int child_status;
839 1.1 haad int sstat;
840 1.1 haad fd_set fds;
841 1.1 haad struct timeval tv = {timeout, 0};
842 1.1 haad
843 1.1 haad FD_ZERO(&fds);
844 1.1 haad FD_SET(c_pipe, &fds);
845 1.1 haad
846 1.1 haad sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
847 1.1 haad if (sstat == 0) {
848 1.1 haad fprintf(stderr, "clvmd startup timed out\n");
849 1.1 haad exit(DFAIL_TIMEOUT);
850 1.1 haad }
851 1.1 haad if (sstat == 1) {
852 1.1 haad if (read(c_pipe, &child_status, sizeof(child_status)) !=
853 1.1 haad sizeof(child_status)) {
854 1.1 haad
855 1.1 haad fprintf(stderr, "clvmd failed in initialisation\n");
856 1.1 haad exit(DFAIL_INIT);
857 1.1 haad }
858 1.1 haad else {
859 1.1 haad switch (child_status) {
860 1.1 haad case SUCCESS:
861 1.1 haad break;
862 1.1 haad case DFAIL_INIT:
863 1.1 haad fprintf(stderr, "clvmd failed in initialisation\n");
864 1.1 haad break;
865 1.1 haad case DFAIL_LOCAL_SOCK:
866 1.1 haad fprintf(stderr, "clvmd could not create local socket\n");
867 1.1 haad fprintf(stderr, "Another clvmd is probably already running\n");
868 1.1 haad break;
869 1.1 haad case DFAIL_CLUSTER_IF:
870 1.1 haad fprintf(stderr, "clvmd could not connect to cluster manager\n");
871 1.1 haad fprintf(stderr, "Consult syslog for more information\n");
872 1.1 haad break;
873 1.1 haad case DFAIL_MALLOC:
874 1.1 haad fprintf(stderr, "clvmd failed, not enough memory\n");
875 1.1 haad break;
876 1.1 haad default:
877 1.1 haad fprintf(stderr, "clvmd failed, error was %d\n", child_status);
878 1.1 haad break;
879 1.1 haad }
880 1.1 haad exit(child_status);
881 1.1 haad }
882 1.1 haad }
883 1.1 haad fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
884 1.1 haad exit(DFAIL_INIT);
885 1.1 haad }
886 1.1 haad
887 1.1 haad /*
888 1.1 haad * Fork into the background and detach from our parent process.
889 1.1 haad * In the interests of user-friendliness we wait for the daemon
890 1.1 haad * to complete initialisation before returning its status
891 1.1 haad * the the user.
892 1.1 haad */
893 1.1 haad static void be_daemon(int timeout)
894 1.1 haad {
895 1.1 haad pid_t pid;
896 1.1 haad int devnull = open("/dev/null", O_RDWR);
897 1.1 haad if (devnull == -1) {
898 1.1 haad perror("Can't open /dev/null");
899 1.1 haad exit(3);
900 1.1 haad }
901 1.1 haad
902 1.1 haad pipe(child_pipe);
903 1.1 haad
904 1.1 haad switch (pid = fork()) {
905 1.1 haad case -1:
906 1.1 haad perror("clvmd: can't fork");
907 1.1 haad exit(2);
908 1.1 haad
909 1.1 haad case 0: /* Child */
910 1.1 haad close(child_pipe[0]);
911 1.1 haad break;
912 1.1 haad
913 1.1 haad default: /* Parent */
914 1.1 haad close(child_pipe[1]);
915 1.1 haad wait_for_child(child_pipe[0], timeout);
916 1.1 haad }
917 1.1 haad
918 1.1 haad /* Detach ourself from the calling environment */
919 1.1 haad if (close(0) || close(1) || close(2)) {
920 1.1 haad perror("Error closing terminal FDs");
921 1.1 haad exit(4);
922 1.1 haad }
923 1.1 haad setsid();
924 1.1 haad
925 1.1 haad if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
926 1.1 haad || dup2(devnull, 2) < 0) {
927 1.2 dholland int e = errno;
928 1.1 haad perror("Error setting terminal FDs to /dev/null");
929 1.2 dholland log_error("Error setting terminal FDs to /dev/null: %s", strerror(e));
930 1.1 haad exit(5);
931 1.1 haad }
932 1.1 haad if (chdir("/")) {
933 1.2 dholland log_error("Error setting current directory to /: %s", strerror(e));
934 1.1 haad exit(6);
935 1.1 haad }
936 1.1 haad
937 1.1 haad }
938 1.1 haad
939 1.1 haad /* Called when we have a read from the local socket.
940 1.1 haad was in the main loop but it's grown up and is a big girl now */
941 1.1 haad static int read_from_local_sock(struct local_client *thisfd)
942 1.1 haad {
943 1.1 haad int len;
944 1.1 haad int argslen;
945 1.1 haad int missing_len;
946 1.1 haad char buffer[PIPE_BUF];
947 1.1 haad
948 1.1 haad len = read(thisfd->fd, buffer, sizeof(buffer));
949 1.1 haad if (len == -1 && errno == EINTR)
950 1.1 haad return 1;
951 1.1 haad
952 1.1 haad DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
953 1.1 haad
954 1.1 haad /* EOF or error on socket */
955 1.1 haad if (len <= 0) {
956 1.1 haad int *status;
957 1.1 haad int jstat;
958 1.1 haad
959 1.1 haad DEBUGLOG("EOF on local socket: inprogress=%d\n",
960 1.1 haad thisfd->bits.localsock.in_progress);
961 1.1 haad
962 1.1 haad thisfd->bits.localsock.finished = 1;
963 1.1 haad
964 1.1 haad /* If the client went away in mid command then tidy up */
965 1.1 haad if (thisfd->bits.localsock.in_progress) {
966 1.1 haad pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
967 1.1 haad pthread_mutex_lock(&thisfd->bits.localsock.mutex);
968 1.1 haad thisfd->bits.localsock.state = POST_COMMAND;
969 1.1 haad pthread_cond_signal(&thisfd->bits.localsock.cond);
970 1.1 haad pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
971 1.1 haad
972 1.1 haad /* Free any unsent buffers */
973 1.1 haad free_reply(thisfd);
974 1.1 haad }
975 1.1 haad
976 1.1 haad /* Kill the subthread & free resources */
977 1.1 haad if (thisfd->bits.localsock.threadid) {
978 1.1 haad DEBUGLOG("Waiting for child thread\n");
979 1.1 haad pthread_mutex_lock(&thisfd->bits.localsock.mutex);
980 1.1 haad thisfd->bits.localsock.state = PRE_COMMAND;
981 1.1 haad pthread_cond_signal(&thisfd->bits.localsock.cond);
982 1.1 haad pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
983 1.1 haad
984 1.1 haad jstat =
985 1.1 haad pthread_join(thisfd->bits.localsock.threadid,
986 1.1 haad (void **) &status);
987 1.1 haad DEBUGLOG("Joined child thread\n");
988 1.1 haad
989 1.1 haad thisfd->bits.localsock.threadid = 0;
990 1.1 haad pthread_cond_destroy(&thisfd->bits.localsock.cond);
991 1.1 haad pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
992 1.1 haad
993 1.1 haad /* Remove the pipe client */
994 1.1 haad if (thisfd->bits.localsock.pipe_client != NULL) {
995 1.1 haad struct local_client *newfd;
996 1.1 haad struct local_client *lastfd = NULL;
997 1.1 haad struct local_client *free_fd = NULL;
998 1.1 haad
999 1.1 haad close(thisfd->bits.localsock.pipe_client->fd); /* Close pipe */
1000 1.1 haad close(thisfd->bits.localsock.pipe);
1001 1.1 haad
1002 1.1 haad /* Remove pipe client */
1003 1.1 haad for (newfd = &local_client_head; newfd != NULL;
1004 1.1 haad newfd = newfd->next) {
1005 1.1 haad if (thisfd->bits.localsock.
1006 1.1 haad pipe_client == newfd) {
1007 1.1 haad thisfd->bits.localsock.
1008 1.1 haad pipe_client = NULL;
1009 1.1 haad
1010 1.1 haad lastfd->next = newfd->next;
1011 1.1 haad free_fd = newfd;
1012 1.1 haad newfd->next = lastfd;
1013 1.1 haad free(free_fd);
1014 1.1 haad break;
1015 1.1 haad }
1016 1.1 haad lastfd = newfd;
1017 1.1 haad }
1018 1.1 haad }
1019 1.1 haad }
1020 1.1 haad
1021 1.1 haad /* Free the command buffer */
1022 1.1 haad free(thisfd->bits.localsock.cmd);
1023 1.1 haad
1024 1.1 haad /* Clear out the cross-link */
1025 1.1 haad if (thisfd->bits.localsock.pipe_client != NULL)
1026 1.1 haad thisfd->bits.localsock.pipe_client->bits.pipe.client =
1027 1.1 haad NULL;
1028 1.1 haad
1029 1.1 haad close(thisfd->fd);
1030 1.1 haad return 0;
1031 1.1 haad } else {
1032 1.1 haad int comms_pipe[2];
1033 1.1 haad struct local_client *newfd;
1034 1.1 haad char csid[MAX_CSID_LEN];
1035 1.1 haad struct clvm_header *inheader;
1036 1.1 haad int status;
1037 1.1 haad
1038 1.1 haad inheader = (struct clvm_header *) buffer;
1039 1.1 haad
1040 1.1 haad /* Fill in the client ID */
1041 1.1 haad inheader->clientid = htonl(thisfd->fd);
1042 1.1 haad
1043 1.1 haad /* If we are already busy then return an error */
1044 1.1 haad if (thisfd->bits.localsock.in_progress) {
1045 1.1 haad struct clvm_header reply;
1046 1.1 haad reply.cmd = CLVMD_CMD_REPLY;
1047 1.1 haad reply.status = EBUSY;
1048 1.1 haad reply.arglen = 0;
1049 1.1 haad reply.flags = 0;
1050 1.1 haad send_message(&reply, sizeof(reply), our_csid,
1051 1.1 haad thisfd->fd,
1052 1.1 haad "Error sending EBUSY reply to local user");
1053 1.1 haad return len;
1054 1.1 haad }
1055 1.1 haad
1056 1.1 haad /* Free any old buffer space */
1057 1.1 haad free(thisfd->bits.localsock.cmd);
1058 1.1 haad
1059 1.1 haad /* See if we have the whole message */
1060 1.1 haad argslen =
1061 1.1 haad len - strlen(inheader->node) - sizeof(struct clvm_header);
1062 1.1 haad missing_len = inheader->arglen - argslen;
1063 1.1 haad
1064 1.1 haad if (missing_len < 0)
1065 1.1 haad missing_len = 0;
1066 1.1 haad
1067 1.1 haad /* Save the message */
1068 1.1 haad thisfd->bits.localsock.cmd = malloc(len + missing_len);
1069 1.1 haad
1070 1.1 haad if (!thisfd->bits.localsock.cmd) {
1071 1.1 haad struct clvm_header reply;
1072 1.1 haad reply.cmd = CLVMD_CMD_REPLY;
1073 1.1 haad reply.status = ENOMEM;
1074 1.1 haad reply.arglen = 0;
1075 1.1 haad reply.flags = 0;
1076 1.1 haad send_message(&reply, sizeof(reply), our_csid,
1077 1.1 haad thisfd->fd,
1078 1.1 haad "Error sending ENOMEM reply to local user");
1079 1.1 haad return 0;
1080 1.1 haad }
1081 1.1 haad memcpy(thisfd->bits.localsock.cmd, buffer, len);
1082 1.1 haad thisfd->bits.localsock.cmd_len = len + missing_len;
1083 1.1 haad inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
1084 1.1 haad
1085 1.1 haad /* If we don't have the full message then read the rest now */
1086 1.1 haad if (missing_len) {
1087 1.1 haad char *argptr =
1088 1.1 haad inheader->node + strlen(inheader->node) + 1;
1089 1.1 haad
1090 1.1 haad while (missing_len > 0 && len >= 0) {
1091 1.1 haad DEBUGLOG
1092 1.1 haad ("got %d bytes, need another %d (total %d)\n",
1093 1.1 haad argslen, missing_len, inheader->arglen);
1094 1.1 haad len = read(thisfd->fd, argptr + argslen,
1095 1.1 haad missing_len);
1096 1.1 haad if (len >= 0) {
1097 1.1 haad missing_len -= len;
1098 1.1 haad argslen += len;
1099 1.1 haad }
1100 1.1 haad }
1101 1.1 haad }
1102 1.1 haad
1103 1.1 haad /* Initialise and lock the mutex so the subthread will wait after
1104 1.1 haad finishing the PRE routine */
1105 1.1 haad if (!thisfd->bits.localsock.threadid) {
1106 1.1 haad pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
1107 1.1 haad pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
1108 1.1 haad pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
1109 1.1 haad }
1110 1.1 haad
1111 1.1 haad /* Only run the command if all the cluster nodes are running CLVMD */
1112 1.1 haad if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
1113 1.1 haad (check_all_clvmds_running(thisfd) == -1)) {
1114 1.1 haad thisfd->bits.localsock.expected_replies = 0;
1115 1.1 haad thisfd->bits.localsock.num_replies = 0;
1116 1.1 haad send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
1117 1.1 haad return len;
1118 1.1 haad }
1119 1.1 haad
1120 1.1 haad /* Check the node name for validity */
1121 1.1 haad if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
1122 1.1 haad /* Error, node is not in the cluster */
1123 1.1 haad struct clvm_header reply;
1124 1.1 haad DEBUGLOG("Unknown node: '%s'\n", inheader->node);
1125 1.1 haad
1126 1.1 haad reply.cmd = CLVMD_CMD_REPLY;
1127 1.1 haad reply.status = ENOENT;
1128 1.1 haad reply.flags = 0;
1129 1.1 haad reply.arglen = 0;
1130 1.1 haad send_message(&reply, sizeof(reply), our_csid,
1131 1.1 haad thisfd->fd,
1132 1.1 haad "Error sending ENOENT reply to local user");
1133 1.1 haad thisfd->bits.localsock.expected_replies = 0;
1134 1.1 haad thisfd->bits.localsock.num_replies = 0;
1135 1.1 haad thisfd->bits.localsock.in_progress = FALSE;
1136 1.1 haad thisfd->bits.localsock.sent_out = FALSE;
1137 1.1 haad return len;
1138 1.1 haad }
1139 1.1 haad
1140 1.1 haad /* If we already have a subthread then just signal it to start */
1141 1.1 haad if (thisfd->bits.localsock.threadid) {
1142 1.1 haad pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1143 1.1 haad thisfd->bits.localsock.state = PRE_COMMAND;
1144 1.1 haad pthread_cond_signal(&thisfd->bits.localsock.cond);
1145 1.1 haad pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1146 1.1 haad return len;
1147 1.1 haad }
1148 1.1 haad
1149 1.1 haad /* Create a pipe and add the reading end to our FD list */
1150 1.1 haad pipe(comms_pipe);
1151 1.1 haad newfd = malloc(sizeof(struct local_client));
1152 1.1 haad if (!newfd) {
1153 1.1 haad struct clvm_header reply;
1154 1.1 haad close(comms_pipe[0]);
1155 1.1 haad close(comms_pipe[1]);
1156 1.1 haad
1157 1.1 haad reply.cmd = CLVMD_CMD_REPLY;
1158 1.1 haad reply.status = ENOMEM;
1159 1.1 haad reply.arglen = 0;
1160 1.1 haad reply.flags = 0;
1161 1.1 haad send_message(&reply, sizeof(reply), our_csid,
1162 1.1 haad thisfd->fd,
1163 1.1 haad "Error sending ENOMEM reply to local user");
1164 1.1 haad return len;
1165 1.1 haad }
1166 1.1 haad DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
1167 1.1 haad comms_pipe[1]);
1168 1.1 haad newfd->fd = comms_pipe[0];
1169 1.1 haad newfd->removeme = 0;
1170 1.1 haad newfd->type = THREAD_PIPE;
1171 1.1 haad newfd->callback = local_pipe_callback;
1172 1.1 haad newfd->next = thisfd->next;
1173 1.1 haad newfd->bits.pipe.client = thisfd;
1174 1.1 haad newfd->bits.pipe.threadid = 0;
1175 1.1 haad thisfd->next = newfd;
1176 1.1 haad
1177 1.1 haad /* Store a cross link to the pipe */
1178 1.1 haad thisfd->bits.localsock.pipe_client = newfd;
1179 1.1 haad
1180 1.1 haad thisfd->bits.localsock.pipe = comms_pipe[1];
1181 1.1 haad
1182 1.1 haad /* Make sure the thread has a copy of it's own ID */
1183 1.1 haad newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
1184 1.1 haad
1185 1.1 haad /* Run the pre routine */
1186 1.1 haad thisfd->bits.localsock.in_progress = TRUE;
1187 1.1 haad thisfd->bits.localsock.state = PRE_COMMAND;
1188 1.1 haad DEBUGLOG("Creating pre&post thread\n");
1189 1.1 haad status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
1190 1.1 haad pre_and_post_thread, thisfd);
1191 1.1 haad DEBUGLOG("Created pre&post thread, state = %d\n", status);
1192 1.1 haad }
1193 1.1 haad return len;
1194 1.1 haad }
1195 1.1 haad
1196 1.1 haad /* Add a file descriptor from the cluster or comms interface to
1197 1.1 haad our list of FDs for select
1198 1.1 haad */
1199 1.1 haad int add_client(struct local_client *new_client)
1200 1.1 haad {
1201 1.1 haad new_client->next = local_client_head.next;
1202 1.1 haad local_client_head.next = new_client;
1203 1.1 haad
1204 1.1 haad return 0;
1205 1.1 haad }
1206 1.1 haad
1207 1.1 haad /* Called when the pre-command has completed successfully - we
1208 1.1 haad now execute the real command on all the requested nodes */
1209 1.1 haad static int distribute_command(struct local_client *thisfd)
1210 1.1 haad {
1211 1.1 haad struct clvm_header *inheader =
1212 1.1 haad (struct clvm_header *) thisfd->bits.localsock.cmd;
1213 1.1 haad int len = thisfd->bits.localsock.cmd_len;
1214 1.1 haad
1215 1.1 haad thisfd->xid = global_xid++;
1216 1.1 haad DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
1217 1.1 haad
1218 1.1 haad /* Forward it to other nodes in the cluster if needed */
1219 1.1 haad if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
1220 1.1 haad /* if node is empty then do it on the whole cluster */
1221 1.1 haad if (inheader->node[0] == '\0') {
1222 1.1 haad thisfd->bits.localsock.expected_replies =
1223 1.1 haad clops->get_num_nodes();
1224 1.1 haad thisfd->bits.localsock.num_replies = 0;
1225 1.1 haad thisfd->bits.localsock.sent_time = time(NULL);
1226 1.1 haad thisfd->bits.localsock.in_progress = TRUE;
1227 1.1 haad thisfd->bits.localsock.sent_out = TRUE;
1228 1.1 haad
1229 1.1 haad /* Do it here first */
1230 1.1 haad add_to_lvmqueue(thisfd, inheader, len, NULL);
1231 1.1 haad
1232 1.1 haad DEBUGLOG("Sending message to all cluster nodes\n");
1233 1.1 haad inheader->xid = thisfd->xid;
1234 1.1 haad send_message(inheader, len, NULL, -1,
1235 1.1 haad "Error forwarding message to cluster");
1236 1.1 haad } else {
1237 1.1 haad /* Do it on a single node */
1238 1.1 haad char csid[MAX_CSID_LEN];
1239 1.1 haad
1240 1.1 haad if (clops->csid_from_name(csid, inheader->node)) {
1241 1.1 haad /* This has already been checked so should not happen */
1242 1.1 haad return 0;
1243 1.1 haad } else {
1244 1.1 haad /* OK, found a node... */
1245 1.1 haad thisfd->bits.localsock.expected_replies = 1;
1246 1.1 haad thisfd->bits.localsock.num_replies = 0;
1247 1.1 haad thisfd->bits.localsock.in_progress = TRUE;
1248 1.1 haad
1249 1.1 haad /* Are we the requested node ?? */
1250 1.1 haad if (memcmp(csid, our_csid, max_csid_len) == 0) {
1251 1.1 haad DEBUGLOG("Doing command on local node only\n");
1252 1.1 haad add_to_lvmqueue(thisfd, inheader, len, NULL);
1253 1.1 haad } else {
1254 1.1 haad DEBUGLOG("Sending message to single node: %s\n",
1255 1.1 haad inheader->node);
1256 1.1 haad inheader->xid = thisfd->xid;
1257 1.1 haad send_message(inheader, len,
1258 1.1 haad csid, -1,
1259 1.1 haad "Error forwarding message to cluster node");
1260 1.1 haad }
1261 1.1 haad }
1262 1.1 haad }
1263 1.1 haad } else {
1264 1.1 haad /* Local explicitly requested, ignore nodes */
1265 1.1 haad thisfd->bits.localsock.in_progress = TRUE;
1266 1.1 haad thisfd->bits.localsock.expected_replies = 1;
1267 1.1 haad thisfd->bits.localsock.num_replies = 0;
1268 1.1 haad add_to_lvmqueue(thisfd, inheader, len, NULL);
1269 1.1 haad }
1270 1.1 haad return 0;
1271 1.1 haad }
1272 1.1 haad
1273 1.1 haad /* Process a command from a remote node and return the result */
1274 1.1 haad static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
1275 1.1 haad const char *csid)
1276 1.1 haad {
1277 1.1 haad char *replyargs;
1278 1.1 haad char nodename[max_cluster_member_name_len];
1279 1.1 haad int replylen = 0;
1280 1.1 haad int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1281 1.1 haad int status;
1282 1.1 haad int msg_malloced = 0;
1283 1.1 haad
1284 1.1 haad /* Get the node name as we /may/ need it later */
1285 1.1 haad clops->name_from_csid(csid, nodename);
1286 1.1 haad
1287 1.1 haad DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
1288 1.1 haad decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
1289 1.1 haad
1290 1.1 haad /* Check for GOAWAY and sulk */
1291 1.1 haad if (msg->cmd == CLVMD_CMD_GOAWAY) {
1292 1.1 haad
1293 1.1 haad DEBUGLOG("Told to go away by %s\n", nodename);
1294 1.1 haad log_error("Told to go away by %s\n", nodename);
1295 1.1 haad exit(99);
1296 1.1 haad }
1297 1.1 haad
1298 1.1 haad /* Version check is internal - don't bother exposing it in
1299 1.1 haad clvmd-command.c */
1300 1.1 haad if (msg->cmd == CLVMD_CMD_VERSION) {
1301 1.1 haad int version_nums[3];
1302 1.1 haad char node[256];
1303 1.1 haad
1304 1.1 haad memcpy(version_nums, msg->args, sizeof(version_nums));
1305 1.1 haad
1306 1.1 haad clops->name_from_csid(csid, node);
1307 1.1 haad DEBUGLOG("Remote node %s is version %d.%d.%d\n",
1308 1.1 haad node,
1309 1.1 haad ntohl(version_nums[0]),
1310 1.1 haad ntohl(version_nums[1]), ntohl(version_nums[2]));
1311 1.1 haad
1312 1.1 haad if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
1313 1.1 haad struct clvm_header byebyemsg;
1314 1.1 haad DEBUGLOG
1315 1.1 haad ("Telling node %s to go away because of incompatible version number\n",
1316 1.1 haad node);
1317 1.1 haad log_notice
1318 1.1 haad ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
1319 1.1 haad node, ntohl(version_nums[0]),
1320 1.1 haad ntohl(version_nums[1]), ntohl(version_nums[2]));
1321 1.1 haad
1322 1.1 haad byebyemsg.cmd = CLVMD_CMD_GOAWAY;
1323 1.1 haad byebyemsg.status = 0;
1324 1.1 haad byebyemsg.flags = 0;
1325 1.1 haad byebyemsg.arglen = 0;
1326 1.1 haad byebyemsg.clientid = 0;
1327 1.1 haad clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
1328 1.1 haad our_csid,
1329 1.1 haad "Error Sending GOAWAY message");
1330 1.1 haad } else {
1331 1.1 haad clops->add_up_node(csid);
1332 1.1 haad }
1333 1.1 haad return;
1334 1.1 haad }
1335 1.1 haad
1336 1.1 haad /* Allocate a default reply buffer */
1337 1.1 haad replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
1338 1.1 haad
1339 1.1 haad if (replyargs != NULL) {
1340 1.1 haad /* Run the command */
1341 1.1 haad status =
1342 1.1 haad do_command(NULL, msg, msglen, &replyargs, buflen,
1343 1.1 haad &replylen);
1344 1.1 haad } else {
1345 1.1 haad status = ENOMEM;
1346 1.1 haad }
1347 1.1 haad
1348 1.1 haad /* If it wasn't a reply, then reply */
1349 1.1 haad if (msg->cmd != CLVMD_CMD_REPLY) {
1350 1.1 haad char *aggreply;
1351 1.1 haad
1352 1.1 haad aggreply =
1353 1.1 haad realloc(replyargs, replylen + sizeof(struct clvm_header));
1354 1.1 haad if (aggreply) {
1355 1.1 haad struct clvm_header *agghead =
1356 1.1 haad (struct clvm_header *) aggreply;
1357 1.1 haad
1358 1.1 haad replyargs = aggreply;
1359 1.1 haad /* Move it up so there's room for a header in front of the data */
1360 1.1 haad memmove(aggreply + offsetof(struct clvm_header, args),
1361 1.1 haad replyargs, replylen);
1362 1.1 haad
1363 1.1 haad agghead->xid = msg->xid;
1364 1.1 haad agghead->cmd = CLVMD_CMD_REPLY;
1365 1.1 haad agghead->status = status;
1366 1.1 haad agghead->flags = 0;
1367 1.1 haad agghead->clientid = msg->clientid;
1368 1.1 haad agghead->arglen = replylen;
1369 1.1 haad agghead->node[0] = '\0';
1370 1.1 haad send_message(aggreply,
1371 1.1 haad sizeof(struct clvm_header) +
1372 1.1 haad replylen, csid, fd,
1373 1.1 haad "Error sending command reply");
1374 1.1 haad } else {
1375 1.1 haad struct clvm_header head;
1376 1.1 haad
1377 1.1 haad DEBUGLOG("Error attempting to realloc return buffer\n");
1378 1.1 haad /* Return a failure response */
1379 1.1 haad head.cmd = CLVMD_CMD_REPLY;
1380 1.1 haad head.status = ENOMEM;
1381 1.1 haad head.flags = 0;
1382 1.1 haad head.clientid = msg->clientid;
1383 1.1 haad head.arglen = 0;
1384 1.1 haad head.node[0] = '\0';
1385 1.1 haad send_message(&head, sizeof(struct clvm_header), csid,
1386 1.1 haad fd, "Error sending ENOMEM command reply");
1387 1.1 haad return;
1388 1.1 haad }
1389 1.1 haad }
1390 1.1 haad
1391 1.1 haad /* Free buffer if it was malloced */
1392 1.1 haad if (msg_malloced) {
1393 1.1 haad free(msg);
1394 1.1 haad }
1395 1.1 haad free(replyargs);
1396 1.1 haad }
1397 1.1 haad
1398 1.1 haad /* Add a reply to a command to the list of replies for this client.
1399 1.1 haad If we have got a full set then send them to the waiting client down the local
1400 1.1 haad socket */
1401 1.1 haad static void add_reply_to_list(struct local_client *client, int status,
1402 1.1 haad const char *csid, const char *buf, int len)
1403 1.1 haad {
1404 1.1 haad struct node_reply *reply;
1405 1.1 haad
1406 1.1 haad pthread_mutex_lock(&client->bits.localsock.reply_mutex);
1407 1.1 haad
1408 1.1 haad /* Add it to the list of replies */
1409 1.1 haad reply = malloc(sizeof(struct node_reply));
1410 1.1 haad if (reply) {
1411 1.1 haad reply->status = status;
1412 1.1 haad clops->name_from_csid(csid, reply->node);
1413 1.1 haad DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
1414 1.1 haad
1415 1.1 haad if (len > 0) {
1416 1.1 haad reply->replymsg = malloc(len);
1417 1.1 haad if (!reply->replymsg) {
1418 1.1 haad reply->status = ENOMEM;
1419 1.1 haad } else {
1420 1.1 haad memcpy(reply->replymsg, buf, len);
1421 1.1 haad }
1422 1.1 haad } else {
1423 1.1 haad reply->replymsg = NULL;
1424 1.1 haad }
1425 1.1 haad /* Hook it onto the reply chain */
1426 1.1 haad reply->next = client->bits.localsock.replies;
1427 1.1 haad client->bits.localsock.replies = reply;
1428 1.1 haad } else {
1429 1.1 haad /* It's all gone horribly wrong... */
1430 1.1 haad pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1431 1.1 haad send_local_reply(client, ENOMEM, client->fd);
1432 1.1 haad return;
1433 1.1 haad }
1434 1.1 haad DEBUGLOG("Got %d replies, expecting: %d\n",
1435 1.1 haad client->bits.localsock.num_replies + 1,
1436 1.1 haad client->bits.localsock.expected_replies);
1437 1.1 haad
1438 1.1 haad /* If we have the whole lot then do the post-process */
1439 1.1 haad if (++client->bits.localsock.num_replies ==
1440 1.1 haad client->bits.localsock.expected_replies) {
1441 1.1 haad /* Post-process the command */
1442 1.1 haad if (client->bits.localsock.threadid) {
1443 1.1 haad pthread_mutex_lock(&client->bits.localsock.mutex);
1444 1.1 haad client->bits.localsock.state = POST_COMMAND;
1445 1.1 haad pthread_cond_signal(&client->bits.localsock.cond);
1446 1.1 haad pthread_mutex_unlock(&client->bits.localsock.mutex);
1447 1.1 haad }
1448 1.1 haad }
1449 1.1 haad pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1450 1.1 haad }
1451 1.1 haad
1452 1.1 haad /* This is the thread that runs the PRE and post commands for a particular connection */
1453 1.1 haad static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
1454 1.1 haad {
1455 1.1 haad struct local_client *client = (struct local_client *) arg;
1456 1.1 haad int status;
1457 1.1 haad int write_status;
1458 1.1 haad sigset_t ss;
1459 1.1 haad int pipe_fd = client->bits.localsock.pipe;
1460 1.1 haad
1461 1.1 haad DEBUGLOG("in sub thread: client = %p\n", client);
1462 1.1 haad
1463 1.1 haad /* Don't start until the LVM thread is ready */
1464 1.1 haad pthread_mutex_lock(&lvm_start_mutex);
1465 1.1 haad pthread_mutex_unlock(&lvm_start_mutex);
1466 1.1 haad DEBUGLOG("Sub thread ready for work.\n");
1467 1.1 haad
1468 1.1 haad /* Ignore SIGUSR1 (handled by master process) but enable
1469 1.1 haad SIGUSR2 (kills subthreads) */
1470 1.1 haad sigemptyset(&ss);
1471 1.1 haad sigaddset(&ss, SIGUSR1);
1472 1.1 haad pthread_sigmask(SIG_BLOCK, &ss, NULL);
1473 1.1 haad
1474 1.1 haad sigdelset(&ss, SIGUSR1);
1475 1.1 haad sigaddset(&ss, SIGUSR2);
1476 1.1 haad pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
1477 1.1 haad
1478 1.1 haad /* Loop around doing PRE and POST functions until the client goes away */
1479 1.1 haad while (!client->bits.localsock.finished) {
1480 1.1 haad /* Execute the code */
1481 1.1 haad status = do_pre_command(client);
1482 1.1 haad
1483 1.1 haad if (status)
1484 1.1 haad client->bits.localsock.all_success = 0;
1485 1.1 haad
1486 1.1 haad DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
1487 1.1 haad
1488 1.1 haad /* Tell the parent process we have finished this bit */
1489 1.1 haad do {
1490 1.1 haad write_status = write(pipe_fd, &status, sizeof(int));
1491 1.1 haad if (write_status == sizeof(int))
1492 1.1 haad break;
1493 1.1 haad if (write_status < 0 &&
1494 1.1 haad (errno == EINTR || errno == EAGAIN))
1495 1.1 haad continue;
1496 1.2 dholland log_error("Error sending to pipe: %s\n", strerror(errno));
1497 1.1 haad break;
1498 1.1 haad } while(1);
1499 1.1 haad
1500 1.1 haad if (status) {
1501 1.1 haad client->bits.localsock.state = POST_COMMAND;
1502 1.1 haad goto next_pre;
1503 1.1 haad }
1504 1.1 haad
1505 1.1 haad /* We may need to wait for the condition variable before running the post command */
1506 1.1 haad pthread_mutex_lock(&client->bits.localsock.mutex);
1507 1.1 haad DEBUGLOG("Waiting to do post command - state = %d\n",
1508 1.1 haad client->bits.localsock.state);
1509 1.1 haad
1510 1.1 haad if (client->bits.localsock.state != POST_COMMAND) {
1511 1.1 haad pthread_cond_wait(&client->bits.localsock.cond,
1512 1.1 haad &client->bits.localsock.mutex);
1513 1.1 haad }
1514 1.1 haad pthread_mutex_unlock(&client->bits.localsock.mutex);
1515 1.1 haad
1516 1.1 haad DEBUGLOG("Got post command condition...\n");
1517 1.1 haad
1518 1.1 haad /* POST function must always run, even if the client aborts */
1519 1.1 haad status = 0;
1520 1.1 haad do_post_command(client);
1521 1.1 haad
1522 1.1 haad do {
1523 1.1 haad write_status = write(pipe_fd, &status, sizeof(int));
1524 1.1 haad if (write_status == sizeof(int))
1525 1.1 haad break;
1526 1.1 haad if (write_status < 0 &&
1527 1.1 haad (errno == EINTR || errno == EAGAIN))
1528 1.1 haad continue;
1529 1.2 dholland log_error("Error sending to pipe: %s\n", strerror(errno));
1530 1.1 haad break;
1531 1.1 haad } while(1);
1532 1.1 haad next_pre:
1533 1.1 haad DEBUGLOG("Waiting for next pre command\n");
1534 1.1 haad
1535 1.1 haad pthread_mutex_lock(&client->bits.localsock.mutex);
1536 1.1 haad if (client->bits.localsock.state != PRE_COMMAND &&
1537 1.1 haad !client->bits.localsock.finished) {
1538 1.1 haad pthread_cond_wait(&client->bits.localsock.cond,
1539 1.1 haad &client->bits.localsock.mutex);
1540 1.1 haad }
1541 1.1 haad pthread_mutex_unlock(&client->bits.localsock.mutex);
1542 1.1 haad
1543 1.1 haad DEBUGLOG("Got pre command condition...\n");
1544 1.1 haad }
1545 1.1 haad DEBUGLOG("Subthread finished\n");
1546 1.1 haad pthread_exit((void *) 0);
1547 1.1 haad }
1548 1.1 haad
1549 1.1 haad /* Process a command on the local node and store the result */
1550 1.1 haad static int process_local_command(struct clvm_header *msg, int msglen,
1551 1.1 haad struct local_client *client,
1552 1.1 haad unsigned short xid)
1553 1.1 haad {
1554 1.1 haad char *replybuf = malloc(max_cluster_message);
1555 1.1 haad int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1556 1.1 haad int replylen = 0;
1557 1.1 haad int status;
1558 1.1 haad
1559 1.1 haad DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
1560 1.1 haad decode_cmd(msg->cmd), msg, msglen, client);
1561 1.1 haad
1562 1.1 haad if (replybuf == NULL)
1563 1.1 haad return -1;
1564 1.1 haad
1565 1.1 haad status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
1566 1.1 haad
1567 1.1 haad if (status)
1568 1.1 haad client->bits.localsock.all_success = 0;
1569 1.1 haad
1570 1.1 haad /* If we took too long then discard the reply */
1571 1.1 haad if (xid == client->xid) {
1572 1.1 haad add_reply_to_list(client, status, our_csid, replybuf, replylen);
1573 1.1 haad } else {
1574 1.1 haad DEBUGLOG
1575 1.1 haad ("Local command took too long, discarding xid %d, current is %d\n",
1576 1.1 haad xid, client->xid);
1577 1.1 haad }
1578 1.1 haad
1579 1.1 haad free(replybuf);
1580 1.1 haad return status;
1581 1.1 haad }
1582 1.1 haad
1583 1.1 haad static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
1584 1.1 haad {
1585 1.1 haad struct local_client *client = NULL;
1586 1.1 haad
1587 1.1 haad client = find_client(msg->clientid);
1588 1.1 haad if (!client) {
1589 1.1 haad DEBUGLOG("Got message for unknown client 0x%x\n",
1590 1.1 haad msg->clientid);
1591 1.1 haad log_error("Got message for unknown client 0x%x\n",
1592 1.1 haad msg->clientid);
1593 1.1 haad return -1;
1594 1.1 haad }
1595 1.1 haad
1596 1.1 haad if (msg->status)
1597 1.1 haad client->bits.localsock.all_success = 0;
1598 1.1 haad
1599 1.1 haad /* Gather replies together for this client id */
1600 1.1 haad if (msg->xid == client->xid) {
1601 1.1 haad add_reply_to_list(client, msg->status, csid, msg->args,
1602 1.1 haad msg->arglen);
1603 1.1 haad } else {
1604 1.1 haad DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
1605 1.1 haad msg->xid, client->xid);
1606 1.1 haad }
1607 1.1 haad return 0;
1608 1.1 haad }
1609 1.1 haad
1610 1.1 haad /* Send an aggregated reply back to the client */
1611 1.1 haad static void send_local_reply(struct local_client *client, int status, int fd)
1612 1.1 haad {
1613 1.1 haad struct clvm_header *clientreply;
1614 1.1 haad struct node_reply *thisreply = client->bits.localsock.replies;
1615 1.1 haad char *replybuf;
1616 1.1 haad char *ptr;
1617 1.1 haad int message_len = 0;
1618 1.1 haad
1619 1.1 haad DEBUGLOG("Send local reply\n");
1620 1.1 haad
1621 1.1 haad /* Work out the total size of the reply */
1622 1.1 haad while (thisreply) {
1623 1.1 haad if (thisreply->replymsg)
1624 1.1 haad message_len += strlen(thisreply->replymsg) + 1;
1625 1.1 haad else
1626 1.1 haad message_len++;
1627 1.1 haad
1628 1.1 haad message_len += strlen(thisreply->node) + 1 + sizeof(int);
1629 1.1 haad
1630 1.1 haad thisreply = thisreply->next;
1631 1.1 haad }
1632 1.1 haad
1633 1.1 haad /* Add in the size of our header */
1634 1.1 haad message_len = message_len + sizeof(struct clvm_header) + 1;
1635 1.1 haad replybuf = malloc(message_len);
1636 1.1 haad
1637 1.1 haad clientreply = (struct clvm_header *) replybuf;
1638 1.1 haad clientreply->status = status;
1639 1.1 haad clientreply->cmd = CLVMD_CMD_REPLY;
1640 1.1 haad clientreply->node[0] = '\0';
1641 1.1 haad clientreply->flags = 0;
1642 1.1 haad
1643 1.1 haad ptr = clientreply->args;
1644 1.1 haad
1645 1.1 haad /* Add in all the replies, and free them as we go */
1646 1.1 haad thisreply = client->bits.localsock.replies;
1647 1.1 haad while (thisreply) {
1648 1.1 haad struct node_reply *tempreply = thisreply;
1649 1.1 haad
1650 1.1 haad strcpy(ptr, thisreply->node);
1651 1.1 haad ptr += strlen(thisreply->node) + 1;
1652 1.1 haad
1653 1.1 haad if (thisreply->status)
1654 1.1 haad clientreply->flags |= CLVMD_FLAG_NODEERRS;
1655 1.1 haad
1656 1.1 haad memcpy(ptr, &thisreply->status, sizeof(int));
1657 1.1 haad ptr += sizeof(int);
1658 1.1 haad
1659 1.1 haad if (thisreply->replymsg) {
1660 1.1 haad strcpy(ptr, thisreply->replymsg);
1661 1.1 haad ptr += strlen(thisreply->replymsg) + 1;
1662 1.1 haad } else {
1663 1.1 haad ptr[0] = '\0';
1664 1.1 haad ptr++;
1665 1.1 haad }
1666 1.1 haad thisreply = thisreply->next;
1667 1.1 haad
1668 1.1 haad free(tempreply->replymsg);
1669 1.1 haad free(tempreply);
1670 1.1 haad }
1671 1.1 haad
1672 1.1 haad /* Terminate with an empty node name */
1673 1.1 haad *ptr = '\0';
1674 1.1 haad
1675 1.1 haad clientreply->arglen = ptr - clientreply->args + 1;
1676 1.1 haad
1677 1.1 haad /* And send it */
1678 1.1 haad send_message(replybuf, message_len, our_csid, fd,
1679 1.1 haad "Error sending REPLY to client");
1680 1.1 haad free(replybuf);
1681 1.1 haad
1682 1.1 haad /* Reset comms variables */
1683 1.1 haad client->bits.localsock.replies = NULL;
1684 1.1 haad client->bits.localsock.expected_replies = 0;
1685 1.1 haad client->bits.localsock.in_progress = FALSE;
1686 1.1 haad client->bits.localsock.sent_out = FALSE;
1687 1.1 haad }
1688 1.1 haad
1689 1.1 haad /* Just free a reply chain baceuse it wasn't used. */
1690 1.1 haad static void free_reply(struct local_client *client)
1691 1.1 haad {
1692 1.1 haad /* Add in all the replies, and free them as we go */
1693 1.1 haad struct node_reply *thisreply = client->bits.localsock.replies;
1694 1.1 haad while (thisreply) {
1695 1.1 haad struct node_reply *tempreply = thisreply;
1696 1.1 haad
1697 1.1 haad thisreply = thisreply->next;
1698 1.1 haad
1699 1.1 haad free(tempreply->replymsg);
1700 1.1 haad free(tempreply);
1701 1.1 haad }
1702 1.1 haad client->bits.localsock.replies = NULL;
1703 1.1 haad }
1704 1.1 haad
1705 1.1 haad /* Send our version number to the cluster */
1706 1.1 haad static void send_version_message()
1707 1.1 haad {
1708 1.1 haad char message[sizeof(struct clvm_header) + sizeof(int) * 3];
1709 1.1 haad struct clvm_header *msg = (struct clvm_header *) message;
1710 1.1 haad int version_nums[3];
1711 1.1 haad
1712 1.1 haad msg->cmd = CLVMD_CMD_VERSION;
1713 1.1 haad msg->status = 0;
1714 1.1 haad msg->flags = 0;
1715 1.1 haad msg->clientid = 0;
1716 1.1 haad msg->arglen = sizeof(version_nums);
1717 1.1 haad
1718 1.1 haad version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
1719 1.1 haad version_nums[1] = htonl(CLVMD_MINOR_VERSION);
1720 1.1 haad version_nums[2] = htonl(CLVMD_PATCH_VERSION);
1721 1.1 haad
1722 1.1 haad memcpy(&msg->args, version_nums, sizeof(version_nums));
1723 1.1 haad
1724 1.1 haad hton_clvm(msg);
1725 1.1 haad
1726 1.1 haad clops->cluster_send_message(message, sizeof(message), NULL,
1727 1.1 haad "Error Sending version number");
1728 1.1 haad }
1729 1.1 haad
1730 1.1 haad /* Send a message to either a local client or another server */
1731 1.1 haad static int send_message(void *buf, int msglen, const char *csid, int fd,
1732 1.1 haad const char *errtext)
1733 1.1 haad {
1734 1.1 haad int len = 0;
1735 1.1 haad int saved_errno = 0;
1736 1.1 haad struct timespec delay;
1737 1.1 haad struct timespec remtime;
1738 1.1 haad
1739 1.1 haad int retry_cnt = 0;
1740 1.1 haad
1741 1.1 haad /* Send remote messages down the cluster socket */
1742 1.1 haad if (csid == NULL || !ISLOCAL_CSID(csid)) {
1743 1.1 haad hton_clvm((struct clvm_header *) buf);
1744 1.1 haad return clops->cluster_send_message(buf, msglen, csid, errtext);
1745 1.1 haad } else {
1746 1.1 haad int ptr = 0;
1747 1.1 haad
1748 1.1 haad /* Make sure it all goes */
1749 1.1 haad do {
1750 1.1 haad if (retry_cnt > MAX_RETRIES)
1751 1.1 haad {
1752 1.1 haad errno = saved_errno;
1753 1.1 haad log_error("%s", errtext);
1754 1.1 haad errno = saved_errno;
1755 1.1 haad break;
1756 1.1 haad }
1757 1.1 haad
1758 1.1 haad len = write(fd, buf + ptr, msglen - ptr);
1759 1.1 haad
1760 1.1 haad if (len <= 0) {
1761 1.1 haad if (errno == EINTR)
1762 1.1 haad continue;
1763 1.1 haad if (errno == EAGAIN ||
1764 1.1 haad errno == EIO ||
1765 1.1 haad errno == ENOSPC) {
1766 1.1 haad saved_errno = errno;
1767 1.1 haad retry_cnt++;
1768 1.1 haad
1769 1.1 haad delay.tv_sec = 0;
1770 1.1 haad delay.tv_nsec = 100000;
1771 1.1 haad remtime.tv_sec = 0;
1772 1.1 haad remtime.tv_nsec = 0;
1773 1.1 haad (void) nanosleep (&delay, &remtime);
1774 1.1 haad
1775 1.1 haad continue;
1776 1.1 haad }
1777 1.1 haad log_error("%s", errtext);
1778 1.1 haad break;
1779 1.1 haad }
1780 1.1 haad ptr += len;
1781 1.1 haad } while (ptr < msglen);
1782 1.1 haad }
1783 1.1 haad return len;
1784 1.1 haad }
1785 1.1 haad
1786 1.1 haad static int process_work_item(struct lvm_thread_cmd *cmd)
1787 1.1 haad {
1788 1.1 haad /* If msg is NULL then this is a cleanup request */
1789 1.1 haad if (cmd->msg == NULL) {
1790 1.1 haad DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
1791 1.1 haad cmd_client_cleanup(cmd->client);
1792 1.1 haad free(cmd->client);
1793 1.1 haad return 0;
1794 1.1 haad }
1795 1.1 haad
1796 1.1 haad if (!cmd->remote) {
1797 1.1 haad DEBUGLOG("process_work_item: local\n");
1798 1.1 haad process_local_command(cmd->msg, cmd->msglen, cmd->client,
1799 1.1 haad cmd->xid);
1800 1.1 haad } else {
1801 1.1 haad DEBUGLOG("process_work_item: remote\n");
1802 1.1 haad process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
1803 1.1 haad cmd->csid);
1804 1.1 haad }
1805 1.1 haad return 0;
1806 1.1 haad }
1807 1.1 haad
1808 1.1 haad /*
1809 1.1 haad * Routine that runs in the "LVM thread".
1810 1.1 haad */
1811 1.2 dholland static void lvm_thread_fn(void *arg)
1812 1.1 haad {
1813 1.1 haad struct dm_list *cmdl, *tmp;
1814 1.1 haad sigset_t ss;
1815 1.1 haad int using_gulm = (int)(long)arg;
1816 1.1 haad
1817 1.1 haad DEBUGLOG("LVM thread function started\n");
1818 1.1 haad
1819 1.1 haad /* Ignore SIGUSR1 & 2 */
1820 1.1 haad sigemptyset(&ss);
1821 1.1 haad sigaddset(&ss, SIGUSR1);
1822 1.1 haad sigaddset(&ss, SIGUSR2);
1823 1.1 haad pthread_sigmask(SIG_BLOCK, &ss, NULL);
1824 1.1 haad
1825 1.1 haad /* Initialise the interface to liblvm */
1826 1.1 haad init_lvm(using_gulm);
1827 1.1 haad
1828 1.1 haad /* Allow others to get moving */
1829 1.1 haad pthread_mutex_unlock(&lvm_start_mutex);
1830 1.1 haad
1831 1.1 haad /* Now wait for some actual work */
1832 1.1 haad for (;;) {
1833 1.1 haad DEBUGLOG("LVM thread waiting for work\n");
1834 1.1 haad
1835 1.1 haad pthread_mutex_lock(&lvm_thread_mutex);
1836 1.1 haad if (dm_list_empty(&lvm_cmd_head))
1837 1.1 haad pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
1838 1.1 haad
1839 1.1 haad dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
1840 1.1 haad struct lvm_thread_cmd *cmd;
1841 1.1 haad
1842 1.1 haad cmd =
1843 1.1 haad dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
1844 1.1 haad dm_list_del(&cmd->list);
1845 1.1 haad pthread_mutex_unlock(&lvm_thread_mutex);
1846 1.1 haad
1847 1.1 haad process_work_item(cmd);
1848 1.1 haad free(cmd->msg);
1849 1.1 haad free(cmd);
1850 1.1 haad
1851 1.1 haad pthread_mutex_lock(&lvm_thread_mutex);
1852 1.1 haad }
1853 1.1 haad pthread_mutex_unlock(&lvm_thread_mutex);
1854 1.1 haad }
1855 1.1 haad }
1856 1.1 haad
1857 1.1 haad /* Pass down some work to the LVM thread */
1858 1.1 haad static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
1859 1.1 haad int msglen, const char *csid)
1860 1.1 haad {
1861 1.1 haad struct lvm_thread_cmd *cmd;
1862 1.1 haad
1863 1.1 haad cmd = malloc(sizeof(struct lvm_thread_cmd));
1864 1.1 haad if (!cmd)
1865 1.1 haad return ENOMEM;
1866 1.1 haad
1867 1.1 haad if (msglen) {
1868 1.1 haad cmd->msg = malloc(msglen);
1869 1.1 haad if (!cmd->msg) {
1870 1.1 haad log_error("Unable to allocate buffer space\n");
1871 1.1 haad free(cmd);
1872 1.1 haad return -1;
1873 1.1 haad }
1874 1.1 haad memcpy(cmd->msg, msg, msglen);
1875 1.1 haad }
1876 1.1 haad else {
1877 1.1 haad cmd->msg = NULL;
1878 1.1 haad }
1879 1.1 haad cmd->client = client;
1880 1.1 haad cmd->msglen = msglen;
1881 1.1 haad cmd->xid = client->xid;
1882 1.1 haad
1883 1.1 haad if (csid) {
1884 1.1 haad memcpy(cmd->csid, csid, max_csid_len);
1885 1.1 haad cmd->remote = 1;
1886 1.1 haad } else {
1887 1.1 haad cmd->remote = 0;
1888 1.1 haad }
1889 1.1 haad
1890 1.1 haad DEBUGLOG
1891 1.1 haad ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
1892 1.1 haad cmd, client, msg, msglen, csid, cmd->xid);
1893 1.1 haad pthread_mutex_lock(&lvm_thread_mutex);
1894 1.1 haad dm_list_add(&lvm_cmd_head, &cmd->list);
1895 1.1 haad pthread_cond_signal(&lvm_thread_cond);
1896 1.1 haad pthread_mutex_unlock(&lvm_thread_mutex);
1897 1.1 haad
1898 1.1 haad return 0;
1899 1.1 haad }
1900 1.1 haad
1901 1.1 haad /* Return 0 if we can talk to an existing clvmd */
1902 1.1 haad static int check_local_clvmd(void)
1903 1.1 haad {
1904 1.1 haad int local_socket;
1905 1.1 haad struct sockaddr_un sockaddr;
1906 1.1 haad int ret = 0;
1907 1.1 haad
1908 1.1 haad /* Open local socket */
1909 1.1 haad if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
1910 1.1 haad return -1;
1911 1.1 haad }
1912 1.1 haad
1913 1.1 haad memset(&sockaddr, 0, sizeof(sockaddr));
1914 1.1 haad memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1915 1.1 haad sockaddr.sun_family = AF_UNIX;
1916 1.1 haad
1917 1.1 haad if (connect(local_socket,(struct sockaddr *) &sockaddr,
1918 1.1 haad sizeof(sockaddr))) {
1919 1.1 haad ret = -1;
1920 1.1 haad }
1921 1.1 haad
1922 1.1 haad close(local_socket);
1923 1.1 haad return ret;
1924 1.1 haad }
1925 1.1 haad
1926 1.1 haad
1927 1.1 haad /* Open the local socket, that's the one we talk to libclvm down */
1928 1.1 haad static int open_local_sock()
1929 1.1 haad {
1930 1.1 haad int local_socket;
1931 1.1 haad struct sockaddr_un sockaddr;
1932 1.1 haad
1933 1.1 haad /* Open local socket */
1934 1.1 haad if (CLVMD_SOCKNAME[0] != '\0')
1935 1.1 haad unlink(CLVMD_SOCKNAME);
1936 1.1 haad local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
1937 1.1 haad if (local_socket < 0) {
1938 1.2 dholland log_error("Can't create local socket: %s", strerror(errno));
1939 1.1 haad return -1;
1940 1.1 haad }
1941 1.1 haad /* Set Close-on-exec & non-blocking */
1942 1.1 haad fcntl(local_socket, F_SETFD, 1);
1943 1.1 haad fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
1944 1.1 haad
1945 1.1 haad memset(&sockaddr, 0, sizeof(sockaddr));
1946 1.1 haad memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1947 1.1 haad sockaddr.sun_family = AF_UNIX;
1948 1.1 haad if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
1949 1.2 dholland log_error("can't bind local socket: %s", strerror(errno));
1950 1.1 haad close(local_socket);
1951 1.1 haad return -1;
1952 1.1 haad }
1953 1.1 haad if (listen(local_socket, 1) != 0) {
1954 1.2 dholland log_error("listen local: %s", strerror(errno));
1955 1.1 haad close(local_socket);
1956 1.1 haad return -1;
1957 1.1 haad }
1958 1.1 haad if (CLVMD_SOCKNAME[0] != '\0')
1959 1.1 haad chmod(CLVMD_SOCKNAME, 0600);
1960 1.1 haad
1961 1.1 haad return local_socket;
1962 1.1 haad }
1963 1.1 haad
1964 1.1 haad void process_message(struct local_client *client, const char *buf, int len,
1965 1.1 haad const char *csid)
1966 1.1 haad {
1967 1.1 haad struct clvm_header *inheader;
1968 1.1 haad
1969 1.1 haad inheader = (struct clvm_header *) buf;
1970 1.1 haad ntoh_clvm(inheader); /* Byteswap fields */
1971 1.1 haad if (inheader->cmd == CLVMD_CMD_REPLY)
1972 1.1 haad process_reply(inheader, len, csid);
1973 1.1 haad else
1974 1.1 haad add_to_lvmqueue(client, inheader, len, csid);
1975 1.1 haad }
1976 1.1 haad
1977 1.1 haad
1978 1.1 haad static void check_all_callback(struct local_client *client, const char *csid,
1979 1.1 haad int node_up)
1980 1.1 haad {
1981 1.1 haad if (!node_up)
1982 1.1 haad add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
1983 1.1 haad 18);
1984 1.1 haad }
1985 1.1 haad
1986 1.1 haad /* Check to see if all CLVMDs are running (ie one on
1987 1.1 haad every node in the cluster).
1988 1.1 haad If not, returns -1 and prints out a list of errant nodes */
1989 1.1 haad static int check_all_clvmds_running(struct local_client *client)
1990 1.1 haad {
1991 1.1 haad DEBUGLOG("check_all_clvmds_running\n");
1992 1.1 haad return clops->cluster_do_node_callback(client, check_all_callback);
1993 1.1 haad }
1994 1.1 haad
1995 1.1 haad /* Return a local_client struct given a client ID.
1996 1.1 haad client IDs are in network byte order */
1997 1.1 haad static struct local_client *find_client(int clientid)
1998 1.1 haad {
1999 1.1 haad struct local_client *thisfd;
2000 1.1 haad for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
2001 1.1 haad if (thisfd->fd == ntohl(clientid))
2002 1.1 haad return thisfd;
2003 1.1 haad }
2004 1.1 haad return NULL;
2005 1.1 haad }
2006 1.1 haad
2007 1.1 haad /* Byte-swapping routines for the header so we
2008 1.1 haad work in a heterogeneous environment */
2009 1.1 haad static void hton_clvm(struct clvm_header *hdr)
2010 1.1 haad {
2011 1.1 haad hdr->status = htonl(hdr->status);
2012 1.1 haad hdr->arglen = htonl(hdr->arglen);
2013 1.1 haad hdr->xid = htons(hdr->xid);
2014 1.1 haad /* Don't swap clientid as it's only a token as far as
2015 1.1 haad remote nodes are concerned */
2016 1.1 haad }
2017 1.1 haad
2018 1.1 haad static void ntoh_clvm(struct clvm_header *hdr)
2019 1.1 haad {
2020 1.1 haad hdr->status = ntohl(hdr->status);
2021 1.1 haad hdr->arglen = ntohl(hdr->arglen);
2022 1.1 haad hdr->xid = ntohs(hdr->xid);
2023 1.1 haad }
2024 1.1 haad
2025 1.1 haad /* Handler for SIGUSR2 - sent to kill subthreads */
2026 1.1 haad static void sigusr2_handler(int sig)
2027 1.1 haad {
2028 1.1 haad DEBUGLOG("SIGUSR2 received\n");
2029 1.1 haad return;
2030 1.1 haad }
2031 1.1 haad
2032 1.1 haad static void sigterm_handler(int sig)
2033 1.1 haad {
2034 1.1 haad DEBUGLOG("SIGTERM received\n");
2035 1.1 haad quit = 1;
2036 1.1 haad return;
2037 1.1 haad }
2038 1.1 haad
2039 1.1 haad static void sighup_handler(int sig)
2040 1.1 haad {
2041 1.1 haad DEBUGLOG("got SIGHUP\n");
2042 1.1 haad reread_config = 1;
2043 1.1 haad }
2044 1.1 haad
2045 1.1 haad int sync_lock(const char *resource, int mode, int flags, int *lockid)
2046 1.1 haad {
2047 1.1 haad return clops->sync_lock(resource, mode, flags, lockid);
2048 1.1 haad }
2049 1.1 haad
2050 1.1 haad int sync_unlock(const char *resource, int lockid)
2051 1.1 haad {
2052 1.1 haad return clops->sync_unlock(resource, lockid);
2053 1.1 haad }
2054 1.1 haad
2055 1.2 dholland static if_type_t parse_cluster_interface(char *ifname)
2056 1.2 dholland {
2057 1.2 dholland if_type_t iface = IF_AUTO;
2058 1.2 dholland
2059 1.2 dholland if (!strcmp(ifname, "auto"))
2060 1.2 dholland iface = IF_AUTO;
2061 1.2 dholland if (!strcmp(ifname, "cman"))
2062 1.2 dholland iface = IF_CMAN;
2063 1.2 dholland if (!strcmp(ifname, "gulm"))
2064 1.2 dholland iface = IF_GULM;
2065 1.2 dholland if (!strcmp(ifname, "openais"))
2066 1.2 dholland iface = IF_OPENAIS;
2067 1.2 dholland if (!strcmp(ifname, "corosync"))
2068 1.2 dholland iface = IF_COROSYNC;
2069 1.2 dholland
2070 1.2 dholland return iface;
2071 1.2 dholland }
2072 1.2 dholland
2073 1.2 dholland /*
2074 1.2 dholland * Try and find a cluster system in corosync's objdb, if it is running. This is
2075 1.2 dholland * only called if the command-line option is not present, and if it fails
2076 1.2 dholland * we still try the interfaces in order.
2077 1.2 dholland */
2078 1.2 dholland static if_type_t get_cluster_type()
2079 1.2 dholland {
2080 1.2 dholland #ifdef HAVE_COROSYNC_CONFDB_H
2081 1.2 dholland confdb_handle_t handle;
2082 1.2 dholland if_type_t type = IF_AUTO;
2083 1.2 dholland int result;
2084 1.2 dholland char buf[255];
2085 1.2 dholland size_t namelen = sizeof(buf);
2086 1.2 dholland hdb_handle_t cluster_handle;
2087 1.2 dholland hdb_handle_t clvmd_handle;
2088 1.2 dholland confdb_callbacks_t callbacks = {
2089 1.2 dholland .confdb_key_change_notify_fn = NULL,
2090 1.2 dholland .confdb_object_create_change_notify_fn = NULL,
2091 1.2 dholland .confdb_object_delete_change_notify_fn = NULL
2092 1.2 dholland };
2093 1.2 dholland
2094 1.2 dholland result = confdb_initialize (&handle, &callbacks);
2095 1.2 dholland if (result != CS_OK)
2096 1.2 dholland return type;
2097 1.2 dholland
2098 1.2 dholland result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
2099 1.2 dholland if (result != CS_OK)
2100 1.2 dholland goto out;
2101 1.2 dholland
2102 1.2 dholland result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
2103 1.2 dholland if (result != CS_OK)
2104 1.2 dholland goto out;
2105 1.2 dholland
2106 1.2 dholland result = confdb_object_find_start(handle, cluster_handle);
2107 1.2 dholland if (result != CS_OK)
2108 1.2 dholland goto out;
2109 1.2 dholland
2110 1.2 dholland result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
2111 1.2 dholland if (result != CS_OK)
2112 1.2 dholland goto out;
2113 1.2 dholland
2114 1.2 dholland result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
2115 1.2 dholland if (result != CS_OK)
2116 1.2 dholland goto out;
2117 1.2 dholland
2118 1.2 dholland buf[namelen] = '\0';
2119 1.2 dholland type = parse_cluster_interface(buf);
2120 1.2 dholland DEBUGLOG("got interface type '%s' from confdb\n", buf);
2121 1.2 dholland out:
2122 1.2 dholland confdb_finalize(handle);
2123 1.2 dholland return type;
2124 1.2 dholland #else
2125 1.2 dholland return IF_AUTO;
2126 1.2 dholland #endif
2127 1.2 dholland }
2128