Home | History | Annotate | Line # | Download | only in libisns
isns_thread.c revision 1.1
      1 /*	$NetBSD: isns_thread.c,v 1.1 2011/01/16 01:22:50 agc Exp $	*/
      2 
      3 /*-
      4  * Copyright (c) 2004,2009 The NetBSD Foundation, Inc.
      5  * All rights reserved.
      6  *
      7  * This code is derived from software contributed to The NetBSD Foundation
      8  * by Wasabi Systems, Inc.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
     20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
     23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29  * POSSIBILITY OF SUCH DAMAGE.
     30  */
     31 
     32 #include <sys/cdefs.h>
     33 __RCSID("$NetBSD: isns_thread.c,v 1.1 2011/01/16 01:22:50 agc Exp $");
     34 
     35 
     36 /*
     37  * isns_thread.c
     38  */
     39 
     40 #include <sys/types.h>
     41 
     42 #include <unistd.h>
     43 
     44 #include "isns.h"
     45 #include "isns_config.h"
     46 #include "isns_defs.h"
     47 
     48 static struct iovec read_buf[2 + (ISNS_MAX_PDU_PAYLOAD / ISNS_BUF_SIZE) +
     49     ((ISNS_MAX_PDU_PAYLOAD % ISNS_BUF_SIZE) != 0)];
     50 
     51 static struct isns_task_s *isns_get_next_task(struct isns_config_s *);
     52 
     53 /*
     54  * isns_control_thread()
     55  */
     56 void *
     57 isns_control_thread(void *arg)
     58 {
     59 	struct isns_config_s *cfg_p = (struct isns_config_s *)arg;
     60 	struct kevent evt_chgs[5], *evt_p;
     61 
     62 	int n, nevents;
     63 	isns_kevent_handler *evt_handler_p;
     64 	int run_thread;
     65 
     66 	run_thread = 1;
     67 
     68 	while (run_thread) {
     69 		/* if no task outstanding, check queue here and send PDU */
     70 		while ((cfg_p->curtask_p == NULL)
     71 		    && ((cfg_p->curtask_p = isns_get_next_task(cfg_p)) != NULL))
     72 			isns_run_task(cfg_p->curtask_p);
     73 
     74 		nevents = kevent(cfg_p->kq, NULL, 0, evt_chgs,
     75 		    ARRAY_ELEMS(evt_chgs), NULL);
     76 
     77 		DBG("isns_control_thread: kevent() nevents=%d\n", nevents);
     78 
     79 		for (n = 0, evt_p = evt_chgs; n < nevents; n++, evt_p++) {
     80 			DBG("event[%d] - data=%d\n", n, (int)evt_p->data);
     81 			evt_handler_p = (void *)evt_p->udata;
     82 			run_thread = (evt_handler_p(evt_p, cfg_p) == 0);
     83 		}
     84 	}
     85 
     86 	return 0;
     87 }
     88 
     89 /*
     90  * isns_get_next_task()
     91  */
     92 static struct isns_task_s *
     93 isns_get_next_task(struct isns_config_s *cfg_p)
     94 {
     95 	struct isns_task_s *task_p = NULL;
     96 
     97 
     98 	DBG("isns_get_next_task: entered\n");
     99 
    100 	task_p = isns_taskq_remove(cfg_p);
    101 
    102 	if (cfg_p->sd_connected)
    103 		return task_p;
    104 	else {
    105 		if (task_p == NULL)
    106 			return NULL;
    107 		else {
    108 			if (task_p->task_type != ISNS_TASK_INIT_SOCKET_IO) {
    109 				isns_taskq_insert_head(cfg_p, task_p);
    110 
    111 				task_p = isns_new_task(cfg_p,
    112 				    ISNS_TASK_RECONNECT_SERVER, 0);
    113 				task_p->var.reconnect_server.ai_p = cfg_p->ai_p;
    114 			}
    115 
    116 			return task_p;
    117 		}
    118 	}
    119 }
    120 
    121 /*
    122  * isns_kevent_pipe()
    123  */
    124 int
    125 isns_kevent_pipe(struct kevent* evt_p, struct isns_config_s *cfg_p)
    126 {
    127 	uint8_t cmd_type;
    128 	int force_isns_stop;
    129 	uint16_t trans_id;
    130 	ssize_t rbytes;
    131 	int pipe_nbytes;
    132 
    133 	force_isns_stop = 0;
    134 	pipe_nbytes = (int)evt_p->data;
    135 
    136 	while (pipe_nbytes > 0) {
    137 		rbytes = read(cfg_p->pipe_fds[0], &cmd_type,
    138 		    sizeof(cmd_type));
    139 		if (rbytes < 0) {
    140 			DBG("isns_kevent_pipe: error on wepe_sys_read\n");
    141 			/*?? should we break here? */
    142 			continue;
    143 		}
    144 
    145 		pipe_nbytes -= (int)rbytes;
    146 		switch (cmd_type) {
    147 		case ISNS_CMD_PROCESS_TASKQ:
    148 			DBG("isns_kevent_pipe: ISNS_CMD_PROCESS_TASKQ\n");
    149 			break;
    150 
    151 		case ISNS_CMD_ABORT_TRANS:
    152 			DBG("isns_kevent_pipe: ISNS_CMD_ABORT_TRANS\n");
    153 			rbytes = read(cfg_p->pipe_fds[0], &trans_id,
    154 			    sizeof(trans_id));
    155 			if ((rbytes < 0) && (rbytes == sizeof(trans_id)))
    156 				isns_abort_trans(cfg_p, trans_id);
    157 			else
    158 				DBG("isns_kevent_pipe: "
    159 				    "error reading trans id\n");
    160 			pipe_nbytes -= (int)rbytes;
    161 			break;
    162 
    163 		case ISNS_CMD_STOP:
    164 			DBG("isns_kevent_pipe: ISNS_CMD_STOP\n");
    165 			force_isns_stop = 1;
    166 			pipe_nbytes = 0;
    167 			break;
    168 
    169 		default:
    170 			DBG("isns_kevent_pipe: unknown command (cmd=%d)\n",
    171 			    cmd_type);
    172 			break;
    173 		}
    174 	}
    175 
    176 	return (force_isns_stop ? 1 : 0);
    177 }
    178 
    179 /*
    180  * isns_is_trans_complete()
    181  */
    182 static int
    183 isns_is_trans_complete(struct isns_trans_s *trans_p)
    184 {
    185 	struct isns_pdu_s *pdu_p;
    186 	uint16_t count;
    187 
    188 	pdu_p = trans_p->pdu_rsp_list;
    189 	count = 0;
    190 	while (pdu_p->next != NULL) {
    191 		if (pdu_p->hdr.seq_id != count++) return 0;
    192 		pdu_p = pdu_p->next;
    193 	}
    194 	if ((pdu_p->hdr.seq_id != count) ||
    195 	    !(pdu_p->hdr.flags & ISNS_FLAG_LAST_PDU))
    196 		return 0;
    197 
    198 	return 1;
    199 }
    200 
    201 /*
    202  * isns_is_valid_resp()
    203  */
    204 static int
    205 isns_is_valid_resp(struct isns_trans_s *trans_p, struct isns_pdu_s *pdu_p)
    206 {
    207 	struct isns_pdu_s *curpdu_p;
    208 
    209 	if (pdu_p->hdr.trans_id != trans_p->id)
    210 		return 0;
    211 	if (pdu_p->hdr.func_id != (trans_p->func_id | 0x8000))
    212 		return 0;
    213 	curpdu_p = trans_p->pdu_rsp_list;
    214 	while (curpdu_p != NULL) {
    215 		if (curpdu_p->hdr.seq_id == pdu_p->hdr.seq_id) return 0;
    216 		curpdu_p = curpdu_p->next;
    217 	}
    218 
    219 	return 1;
    220 }
    221 
    222 /*
    223  * isns_process_in_pdu()
    224  */
    225 static void
    226 isns_process_in_pdu(struct isns_config_s *cfg_p)
    227 {
    228 	struct isns_task_s *curtask_p;
    229 	struct isns_trans_s *trans_p;
    230 
    231 	DBG("isns_process_in_pdu: entered\n");
    232 
    233 	if ((curtask_p = cfg_p->curtask_p) == NULL)
    234 		isns_free_pdu(cfg_p->pdu_in_p);
    235 	else if ((trans_p = curtask_p->var.send_pdu.trans_p) == NULL)
    236 		isns_free_pdu(cfg_p->pdu_in_p);
    237 	else if (!isns_is_valid_resp(trans_p, cfg_p->pdu_in_p))
    238 		isns_free_pdu(cfg_p->pdu_in_p);
    239 	else {
    240 		isns_add_pdu_response(trans_p, cfg_p->pdu_in_p);
    241 
    242 		if (isns_is_trans_complete(trans_p)) {
    243 			isns_complete_trans(trans_p);
    244 			isns_end_task(curtask_p);
    245 		}
    246 	}
    247 
    248 	cfg_p->pdu_in_p = NULL;
    249 }
    250 
    251 /*
    252  * isns_kevent_socket()
    253  */
    254 int
    255 isns_kevent_socket(struct kevent *evt_p, struct isns_config_s *cfg_p)
    256 {
    257 	struct iovec *iovp;
    258 	struct isns_buffer_s *curbuf_p, *newbuf_p;
    259 	struct isns_pdu_s *pdu_p;
    260 	int64_t bavail; /* bytes available in socket buffer */
    261 	uint32_t cur_len, buf_len, unread_len, rd_len, b_len;
    262 	ssize_t rv;
    263 	uint16_t payload_len;
    264 	int iovcnt, more, transport_evt;
    265 
    266 
    267 	DBG("isns_kevent_socket: entered\n");
    268 
    269 	transport_evt = 0;
    270 	bavail = evt_p->data;
    271 	iovp = read_buf;
    272 
    273 	more = (bavail > 0);
    274 	while (more) {
    275 		if (cfg_p->pdu_in_p == NULL) {
    276 			/*
    277  	 		 * Try to form a valid pdu by starting with the hdr.
    278 			 * If there isn't enough data in the socket buffer
    279 			 * to form a full hdr, just return.
    280  	 		 *
    281  	 		 * Once we have read in our hdr, allocate all buffers
    282 			 * needed.
    283  	 		 */
    284 
    285 			if (bavail < (int64_t)sizeof(struct isns_pdu_hdr_s))
    286 				return 0;
    287 
    288 			/* Form a placeholder pdu */
    289 			pdu_p = isns_new_pdu(cfg_p, 0, 0, 0);
    290 
    291 			/* Read the header into our placeholder pdu */
    292 			read_buf[0].iov_base = &(pdu_p->hdr);
    293 			read_buf[0].iov_len = sizeof(struct isns_pdu_hdr_s);
    294 			iovcnt = 1;
    295 
    296 			iovp = read_buf;
    297 			rv = isns_socket_readv(cfg_p->sd, iovp, iovcnt);
    298 			if ((rv == 0) || (rv == -1)) {
    299 				DBG("isns_kevent_socket: isns_socket_readv(1) "
    300 				    "returned %d\n", rv);
    301 				transport_evt = 1;
    302 				break;
    303 			}
    304 
    305 			bavail -= sizeof(struct isns_pdu_hdr_s);
    306 			/*
    307 			 * ToDo: read until sizeof(struct isns_pdu_hdr_s) has
    308 			 *       been read in. This statement should be
    309 			 *
    310 			 *       bavail -= rv;
    311 			 */
    312 
    313 			/* adjust byte order */
    314 			pdu_p->hdr.isnsp_version = isns_ntohs(pdu_p->hdr.
    315 			    isnsp_version);
    316 			pdu_p->hdr.func_id = isns_ntohs(pdu_p->hdr.func_id);
    317 			pdu_p->hdr.payload_len = isns_ntohs(pdu_p->hdr.
    318 			    payload_len);
    319 			pdu_p->hdr.flags = isns_ntohs(pdu_p->hdr.flags);
    320 			pdu_p->hdr.trans_id = isns_ntohs(pdu_p->hdr.trans_id);
    321 			pdu_p->hdr.seq_id = isns_ntohs(pdu_p->hdr.seq_id);
    322 			pdu_p->byteorder_host = 1;
    323 
    324 			/* Try to sense early whether we might have garbage */
    325 			if (pdu_p->hdr.isnsp_version != ISNSP_VERSION) {
    326 				DBG("isns_kevent_socket: pdu_p->hdr."
    327 				    "isnsp_version != ISNSP_VERSION\n");
    328 				isns_free_pdu(pdu_p);
    329 
    330 				transport_evt = 1;
    331 				break;
    332 			}
    333 
    334 			/* Allocate all the necessary payload buffers */
    335 			payload_len = pdu_p->hdr.payload_len;
    336 			curbuf_p = pdu_p->payload_p;
    337 			buf_len = 0;
    338 			while (buf_len + curbuf_p->alloc_len < payload_len) {
    339 				buf_len += curbuf_p->alloc_len;
    340 				newbuf_p = isns_new_buffer(0);
    341 				curbuf_p->next = newbuf_p;
    342 				curbuf_p = newbuf_p;
    343 			}
    344 			curbuf_p->next = NULL;
    345 
    346 			/* Hold on to our placeholder pdu */
    347 			cfg_p->pdu_in_p = pdu_p;
    348 			more = (bavail > 0) ? 1 : 0;
    349 		} else if (bavail > 0) {
    350 			/*
    351  	 		 * Fill in the pdu payload data.
    352 			 *
    353  	 		 * If we can fill it all in now
    354 	 		 *     -AND- it corresponds to the active transaction
    355 			 *           then add the pdu to the transaction's
    356 			 *           pdu_rsp_list
    357 	 		 *     -AND- it does not correspond to the active
    358 			 *           transaction (or there is no active
    359 			 *           transaction) then drop it on the floor.
    360 			 * We may not be able to fill it all in now.
    361 	 		 *     -EITHER WAY- fill in as much payload data now
    362 			 *                  as we can.
    363  	 		 */
    364 
    365 			/* Refer to our placeholder pdu */
    366 			pdu_p = cfg_p->pdu_in_p;
    367 
    368 			/* How much payload data has been filled in? */
    369 			cur_len = 0;
    370 			curbuf_p = pdu_p->payload_p;
    371 			while (curbuf_p->cur_len == curbuf_p->alloc_len) {
    372 				cur_len += curbuf_p->cur_len;
    373 				curbuf_p = curbuf_p->next;
    374 			}
    375 			cur_len += curbuf_p->cur_len;
    376 
    377 			/* How much payload data is left to be filled in? */
    378 			unread_len = pdu_p->hdr.payload_len - cur_len;
    379 
    380 			/* Read as much remaining payload data as possible */
    381 			iovcnt = 0;
    382 			while (curbuf_p->next != NULL) {
    383 				read_buf[iovcnt].iov_base = isns_buffer_data(
    384 			    	    curbuf_p, curbuf_p->cur_len);
    385 				read_buf[iovcnt].iov_len = curbuf_p->alloc_len -
    386 			    	    curbuf_p->cur_len;
    387 				iovcnt++;
    388 
    389 				curbuf_p = curbuf_p->next;
    390 			}
    391 			read_buf[iovcnt].iov_base = isns_buffer_data(curbuf_p,
    392 		    	    curbuf_p->cur_len);
    393 			read_buf[iovcnt].iov_len = unread_len;
    394 			iovcnt++;
    395 
    396 			rv = isns_socket_readv(cfg_p->sd, iovp, iovcnt);
    397 			if ((rv == 0) || (rv == -1)) {
    398 				DBG("isns_kevent_socket: isns_socket_readv(2) "
    399 			    	    "returned %d\n",rv);
    400 				isns_free_pdu(cfg_p->pdu_in_p);
    401 				cfg_p->pdu_in_p = NULL;
    402 
    403 				transport_evt = 1;
    404 				break;
    405 			}
    406 
    407 			/* Update cur_len in buffers that newly have data */
    408 			curbuf_p = pdu_p->payload_p;
    409 			while (curbuf_p->cur_len == curbuf_p->alloc_len)
    410 				curbuf_p = curbuf_p->next;
    411 
    412 			rd_len = (uint32_t)rv;
    413 			do {
    414 				b_len = curbuf_p->alloc_len - curbuf_p->cur_len;
    415 				if (rd_len > b_len) {
    416 					curbuf_p->cur_len = curbuf_p->alloc_len;
    417 					rd_len -= b_len;
    418 				} else {
    419 					curbuf_p->cur_len += rd_len;
    420 					break;
    421 				}
    422 
    423 				curbuf_p = curbuf_p->next;
    424 			} while (curbuf_p != NULL);
    425 
    426 			bavail -= rv;
    427 
    428 			if (rv == (int)unread_len)
    429 				isns_process_in_pdu(cfg_p);
    430 
    431 			more = (bavail > (int64_t)sizeof(struct isns_pdu_hdr_s)) ? 1 : 0;
    432 		}
    433 	}
    434 
    435 	transport_evt |= (evt_p->flags & EV_EOF);
    436 	if (transport_evt) {
    437 		DBG("isns_kevent_socket: processing transport event\n");
    438 
    439 		isns_socket_close(cfg_p->sd);
    440 		cfg_p->sd_connected = 0;
    441 
    442 		if (cfg_p->curtask_p != NULL)
    443 			isns_process_connection_loss(cfg_p);
    444 
    445 		if (cfg_p->pdu_in_p != NULL) {
    446 			isns_free_pdu(cfg_p->pdu_in_p);
    447 			cfg_p->pdu_in_p = NULL;
    448 		}
    449 	}
    450 
    451 	return 0;
    452 }
    453 
    454 /* ARGSUSED */
    455 /*
    456  * isns_kevent_timer_recon()
    457  */
    458 int
    459 isns_kevent_timer_recon(struct kevent *evt_p, struct isns_config_s *cfg_p)
    460 {
    461 	int rv;
    462 
    463 
    464 	DBG("isns_kevent_timer_recon: entered\n");
    465 
    466 	rv = isns_socket_create(&(cfg_p->sd), cfg_p->ai_p->ai_family,
    467 		cfg_p->ai_p->ai_socktype);
    468 	if (rv != 0)
    469 		return 0;
    470 
    471 	rv = isns_socket_connect(cfg_p->sd, cfg_p->ai_p->ai_addr,
    472 	    cfg_p->ai_p->ai_addrlen);
    473 	if (rv == 0) {
    474 		/* Remove ISNS_EVT_TIMER_RECON from kqueue */
    475 		rv = isns_change_kevent_list(cfg_p,
    476 		    (uintptr_t)ISNS_EVT_TIMER_RECON, EVFILT_TIMER, EV_DELETE,
    477 		    (int64_t)0, (intptr_t)0);
    478 		if (rv == -1)
    479 			DBG("isns_kevent_timer_recon: error on "
    480 			    "isns_change_kevent_list(1)\n");
    481 
    482 		cfg_p->sd_connected = 1;
    483 
    484 		/* Add cfg_p->sd to kqueue */
    485 		rv = isns_change_kevent_list(cfg_p, (uintptr_t)cfg_p->sd,
    486 		    EVFILT_READ, EV_ADD | EV_CLEAR, (int64_t)0,
    487 		    (intptr_t)isns_kevent_socket);
    488 		if (rv == -1)
    489 			DBG("isns_kevent_timer_recon: error on "
    490 			    "isns_change_kevent_list(2)\n");
    491 
    492 		isns_end_task(cfg_p->curtask_p);
    493 	}
    494 
    495 	return 0;
    496 }
    497 
    498 
    499 /* ARGSUSED */
    500 /*
    501  * isns_kevent_timer_refresh
    502  */
    503 int
    504 isns_kevent_timer_refresh(struct kevent* evt_p, struct isns_config_s *cfg_p)
    505 {
    506 	struct isns_refresh_s *ref_p;
    507 	ISNS_TRANS trans;
    508 	uint32_t status;
    509 	int rval;
    510 
    511 	DBG("isns_kevent_timer_refresh: entered\n");
    512 
    513 	/* If refresh info pointer NULL, or no name assigned, just return. */
    514 	ref_p = cfg_p->refresh_p;
    515 	if ((ref_p == NULL) || (ref_p->node[0] == '\0'))
    516 	    	return 0;
    517 
    518 	if (ref_p->trans_p != NULL) {
    519 		/* If the previous refresh trans is not complete, return. */
    520 		rval = isns_get_pdu_response_status(ref_p->trans_p, &status);
    521 		if (rval == EPERM) {
    522 			DBG("isns_kevent_timer_refresh: "
    523 			    "prev refresh trans not complete\n");
    524 			return 0;
    525 		}
    526 		/* Free previous refresh trans. */
    527 		isns_free_trans(ref_p->trans_p);
    528 		ref_p->trans_p = NULL;
    529 	}
    530 
    531 	/* Build new refresh transaction and send it. */
    532 	trans = isns_new_trans((ISNS_HANDLE)cfg_p, isnsp_DevAttrQry, 0);
    533 	if (trans == ISNS_INVALID_TRANS) {
    534 		DBG("isns_kevent_timer_refresh: error on isns_new_trans()\n");
    535 		return 0;
    536 	}
    537 
    538 	ref_p->trans_p = (struct isns_trans_s *)trans;
    539 	/* First we add our source attribute */
    540 	isns_add_string(trans, isnst_iSCSIName, ref_p->node);
    541 	/* Now add our message attribute */
    542 	isns_add_string(trans, isnst_iSCSIName, ref_p->node);
    543 	isns_add_tlv(trans, isnst_Delimiter, 0, NULL);
    544 	/* and finally the operating attributes */
    545 	isns_add_tlv(trans, isnst_EID, 0, NULL);
    546 	isns_send_trans(trans, NULL, NULL);
    547 
    548 	return 0;
    549 }
    550