Home | History | Annotate | Line # | Download | only in libisns
      1 /*	$NetBSD: isns_thread.c,v 1.2 2019/07/03 18:40:33 dholland Exp $	*/
      2 
      3 /*-
      4  * Copyright (c) 2004,2009 The NetBSD Foundation, Inc.
      5  * All rights reserved.
      6  *
      7  * This code is derived from software contributed to The NetBSD Foundation
      8  * by Wasabi Systems, Inc.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
     20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
     23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29  * POSSIBILITY OF SUCH DAMAGE.
     30  */
     31 
     32 #include <sys/cdefs.h>
     33 __RCSID("$NetBSD: isns_thread.c,v 1.2 2019/07/03 18:40:33 dholland Exp $");
     34 
     35 
     36 /*
     37  * isns_thread.c
     38  */
     39 
     40 #include <sys/types.h>
     41 
     42 #include <unistd.h>
     43 
     44 #include "isns.h"
     45 #include "isns_config.h"
     46 #include "isns_defs.h"
     47 
     48 static struct iovec read_buf[2 + (ISNS_MAX_PDU_PAYLOAD / ISNS_BUF_SIZE) +
     49     ((ISNS_MAX_PDU_PAYLOAD % ISNS_BUF_SIZE) != 0)];
     50 
     51 static struct isns_task_s *isns_get_next_task(struct isns_config_s *);
     52 
     53 /*
     54  * isns_control_thread()
     55  */
     56 void *
     57 isns_control_thread(void *arg)
     58 {
     59 	struct isns_config_s *cfg_p = (struct isns_config_s *)arg;
     60 	struct kevent evt_chgs[5], *evt_p;
     61 
     62 	int n, nevents;
     63 	isns_kevent_handler *evt_handler_p;
     64 	int run_thread;
     65 
     66 	run_thread = 1;
     67 
     68 	while (run_thread) {
     69 		/* if no task outstanding, check queue here and send PDU */
     70 		while ((cfg_p->curtask_p == NULL)
     71 		    && ((cfg_p->curtask_p = isns_get_next_task(cfg_p)) != NULL))
     72 			isns_run_task(cfg_p->curtask_p);
     73 
     74 		nevents = kevent(cfg_p->kq, NULL, 0, evt_chgs,
     75 		    ARRAY_ELEMS(evt_chgs), NULL);
     76 
     77 		DBG("isns_control_thread: kevent() nevents=%d\n", nevents);
     78 
     79 		for (n = 0, evt_p = evt_chgs; n < nevents; n++, evt_p++) {
     80 			DBG("event[%d] - data=%d\n", n, (int)evt_p->data);
     81 			evt_handler_p = (void *)evt_p->udata;
     82 			run_thread = (evt_handler_p(evt_p, cfg_p) == 0);
     83 		}
     84 	}
     85 
     86 	return 0;
     87 }
     88 
     89 /*
     90  * isns_get_next_task()
     91  */
     92 static struct isns_task_s *
     93 isns_get_next_task(struct isns_config_s *cfg_p)
     94 {
     95 	struct isns_task_s *task_p = NULL;
     96 
     97 
     98 	DBG("isns_get_next_task: entered\n");
     99 
    100 	task_p = isns_taskq_remove(cfg_p);
    101 
    102 	if (cfg_p->sd_connected)
    103 		return task_p;
    104 	else {
    105 		if (task_p == NULL)
    106 			return NULL;
    107 		else {
    108 			if (task_p->task_type != ISNS_TASK_INIT_SOCKET_IO) {
    109 				isns_taskq_insert_head(cfg_p, task_p);
    110 
    111 				task_p = isns_new_task(cfg_p,
    112 				    ISNS_TASK_RECONNECT_SERVER, 0);
    113 				task_p->var.reconnect_server.ai_p = cfg_p->ai_p;
    114 			}
    115 
    116 			return task_p;
    117 		}
    118 	}
    119 }
    120 
    121 /*
    122  * isns_kevent_pipe()
    123  */
    124 int
    125 isns_kevent_pipe(struct kevent* evt_p, struct isns_config_s *cfg_p)
    126 {
    127 	uint8_t cmd_type;
    128 	int force_isns_stop;
    129 	uint16_t trans_id;
    130 	ssize_t rbytes;
    131 	int pipe_nbytes;
    132 
    133 	force_isns_stop = 0;
    134 	pipe_nbytes = (int)evt_p->data;
    135 
    136 	while (pipe_nbytes > 0) {
    137 		rbytes = read(cfg_p->pipe_fds[0], &cmd_type,
    138 		    sizeof(cmd_type));
    139 		if (rbytes < 0) {
    140 			DBG("isns_kevent_pipe: error on wepe_sys_read\n");
    141 			/*?? should we break here? */
    142 			continue;
    143 		}
    144 
    145 		pipe_nbytes -= (int)rbytes;
    146 		switch (cmd_type) {
    147 		case ISNS_CMD_PROCESS_TASKQ:
    148 			DBG("isns_kevent_pipe: ISNS_CMD_PROCESS_TASKQ\n");
    149 			break;
    150 
    151 		case ISNS_CMD_ABORT_TRANS:
    152 			DBG("isns_kevent_pipe: ISNS_CMD_ABORT_TRANS\n");
    153 			rbytes = read(cfg_p->pipe_fds[0], &trans_id,
    154 			    sizeof(trans_id));
    155 			if (rbytes < 0)
    156 				DBG("isns_kevent_pipe: "
    157 				    "error reading trans id\n");
    158 			else if (rbytes != sizeof(trans_id))
    159 				DBG("isns_kevent_pipe: "
    160 				    "short read reading trans id\n");
    161 			else {
    162 				isns_abort_trans(cfg_p, trans_id);
    163 				pipe_nbytes -= (int)rbytes;
    164 			}
    165 			break;
    166 
    167 		case ISNS_CMD_STOP:
    168 			DBG("isns_kevent_pipe: ISNS_CMD_STOP\n");
    169 			force_isns_stop = 1;
    170 			pipe_nbytes = 0;
    171 			break;
    172 
    173 		default:
    174 			DBG("isns_kevent_pipe: unknown command (cmd=%d)\n",
    175 			    cmd_type);
    176 			break;
    177 		}
    178 	}
    179 
    180 	return (force_isns_stop ? 1 : 0);
    181 }
    182 
    183 /*
    184  * isns_is_trans_complete()
    185  */
    186 static int
    187 isns_is_trans_complete(struct isns_trans_s *trans_p)
    188 {
    189 	struct isns_pdu_s *pdu_p;
    190 	uint16_t count;
    191 
    192 	pdu_p = trans_p->pdu_rsp_list;
    193 	count = 0;
    194 	while (pdu_p->next != NULL) {
    195 		if (pdu_p->hdr.seq_id != count++) return 0;
    196 		pdu_p = pdu_p->next;
    197 	}
    198 	if ((pdu_p->hdr.seq_id != count) ||
    199 	    !(pdu_p->hdr.flags & ISNS_FLAG_LAST_PDU))
    200 		return 0;
    201 
    202 	return 1;
    203 }
    204 
    205 /*
    206  * isns_is_valid_resp()
    207  */
    208 static int
    209 isns_is_valid_resp(struct isns_trans_s *trans_p, struct isns_pdu_s *pdu_p)
    210 {
    211 	struct isns_pdu_s *curpdu_p;
    212 
    213 	if (pdu_p->hdr.trans_id != trans_p->id)
    214 		return 0;
    215 	if (pdu_p->hdr.func_id != (trans_p->func_id | 0x8000))
    216 		return 0;
    217 	curpdu_p = trans_p->pdu_rsp_list;
    218 	while (curpdu_p != NULL) {
    219 		if (curpdu_p->hdr.seq_id == pdu_p->hdr.seq_id) return 0;
    220 		curpdu_p = curpdu_p->next;
    221 	}
    222 
    223 	return 1;
    224 }
    225 
    226 /*
    227  * isns_process_in_pdu()
    228  */
    229 static void
    230 isns_process_in_pdu(struct isns_config_s *cfg_p)
    231 {
    232 	struct isns_task_s *curtask_p;
    233 	struct isns_trans_s *trans_p;
    234 
    235 	DBG("isns_process_in_pdu: entered\n");
    236 
    237 	if ((curtask_p = cfg_p->curtask_p) == NULL)
    238 		isns_free_pdu(cfg_p->pdu_in_p);
    239 	else if ((trans_p = curtask_p->var.send_pdu.trans_p) == NULL)
    240 		isns_free_pdu(cfg_p->pdu_in_p);
    241 	else if (!isns_is_valid_resp(trans_p, cfg_p->pdu_in_p))
    242 		isns_free_pdu(cfg_p->pdu_in_p);
    243 	else {
    244 		isns_add_pdu_response(trans_p, cfg_p->pdu_in_p);
    245 
    246 		if (isns_is_trans_complete(trans_p)) {
    247 			isns_complete_trans(trans_p);
    248 			isns_end_task(curtask_p);
    249 		}
    250 	}
    251 
    252 	cfg_p->pdu_in_p = NULL;
    253 }
    254 
    255 /*
    256  * isns_kevent_socket()
    257  */
    258 int
    259 isns_kevent_socket(struct kevent *evt_p, struct isns_config_s *cfg_p)
    260 {
    261 	struct iovec *iovp;
    262 	struct isns_buffer_s *curbuf_p, *newbuf_p;
    263 	struct isns_pdu_s *pdu_p;
    264 	int64_t bavail; /* bytes available in socket buffer */
    265 	uint32_t cur_len, buf_len, unread_len, rd_len, b_len;
    266 	ssize_t rv;
    267 	uint16_t payload_len;
    268 	int iovcnt, more, transport_evt;
    269 
    270 
    271 	DBG("isns_kevent_socket: entered\n");
    272 
    273 	transport_evt = 0;
    274 	bavail = evt_p->data;
    275 	iovp = read_buf;
    276 
    277 	more = (bavail > 0);
    278 	while (more) {
    279 		if (cfg_p->pdu_in_p == NULL) {
    280 			/*
    281  	 		 * Try to form a valid pdu by starting with the hdr.
    282 			 * If there isn't enough data in the socket buffer
    283 			 * to form a full hdr, just return.
    284  	 		 *
    285  	 		 * Once we have read in our hdr, allocate all buffers
    286 			 * needed.
    287  	 		 */
    288 
    289 			if (bavail < (int64_t)sizeof(struct isns_pdu_hdr_s))
    290 				return 0;
    291 
    292 			/* Form a placeholder pdu */
    293 			pdu_p = isns_new_pdu(cfg_p, 0, 0, 0);
    294 
    295 			/* Read the header into our placeholder pdu */
    296 			read_buf[0].iov_base = &(pdu_p->hdr);
    297 			read_buf[0].iov_len = sizeof(struct isns_pdu_hdr_s);
    298 			iovcnt = 1;
    299 
    300 			iovp = read_buf;
    301 			rv = isns_socket_readv(cfg_p->sd, iovp, iovcnt);
    302 			if ((rv == 0) || (rv == -1)) {
    303 				DBG("isns_kevent_socket: isns_socket_readv(1) "
    304 				    "returned %d\n", rv);
    305 				transport_evt = 1;
    306 				break;
    307 			}
    308 
    309 			bavail -= sizeof(struct isns_pdu_hdr_s);
    310 			/*
    311 			 * ToDo: read until sizeof(struct isns_pdu_hdr_s) has
    312 			 *       been read in. This statement should be
    313 			 *
    314 			 *       bavail -= rv;
    315 			 */
    316 
    317 			/* adjust byte order */
    318 			pdu_p->hdr.isnsp_version = isns_ntohs(pdu_p->hdr.
    319 			    isnsp_version);
    320 			pdu_p->hdr.func_id = isns_ntohs(pdu_p->hdr.func_id);
    321 			pdu_p->hdr.payload_len = isns_ntohs(pdu_p->hdr.
    322 			    payload_len);
    323 			pdu_p->hdr.flags = isns_ntohs(pdu_p->hdr.flags);
    324 			pdu_p->hdr.trans_id = isns_ntohs(pdu_p->hdr.trans_id);
    325 			pdu_p->hdr.seq_id = isns_ntohs(pdu_p->hdr.seq_id);
    326 			pdu_p->byteorder_host = 1;
    327 
    328 			/* Try to sense early whether we might have garbage */
    329 			if (pdu_p->hdr.isnsp_version != ISNSP_VERSION) {
    330 				DBG("isns_kevent_socket: pdu_p->hdr."
    331 				    "isnsp_version != ISNSP_VERSION\n");
    332 				isns_free_pdu(pdu_p);
    333 
    334 				transport_evt = 1;
    335 				break;
    336 			}
    337 
    338 			/* Allocate all the necessary payload buffers */
    339 			payload_len = pdu_p->hdr.payload_len;
    340 			curbuf_p = pdu_p->payload_p;
    341 			buf_len = 0;
    342 			while (buf_len + curbuf_p->alloc_len < payload_len) {
    343 				buf_len += curbuf_p->alloc_len;
    344 				newbuf_p = isns_new_buffer(0);
    345 				curbuf_p->next = newbuf_p;
    346 				curbuf_p = newbuf_p;
    347 			}
    348 			curbuf_p->next = NULL;
    349 
    350 			/* Hold on to our placeholder pdu */
    351 			cfg_p->pdu_in_p = pdu_p;
    352 			more = (bavail > 0) ? 1 : 0;
    353 		} else if (bavail > 0) {
    354 			/*
    355  	 		 * Fill in the pdu payload data.
    356 			 *
    357  	 		 * If we can fill it all in now
    358 	 		 *     -AND- it corresponds to the active transaction
    359 			 *           then add the pdu to the transaction's
    360 			 *           pdu_rsp_list
    361 	 		 *     -AND- it does not correspond to the active
    362 			 *           transaction (or there is no active
    363 			 *           transaction) then drop it on the floor.
    364 			 * We may not be able to fill it all in now.
    365 	 		 *     -EITHER WAY- fill in as much payload data now
    366 			 *                  as we can.
    367  	 		 */
    368 
    369 			/* Refer to our placeholder pdu */
    370 			pdu_p = cfg_p->pdu_in_p;
    371 
    372 			/* How much payload data has been filled in? */
    373 			cur_len = 0;
    374 			curbuf_p = pdu_p->payload_p;
    375 			while (curbuf_p->cur_len == curbuf_p->alloc_len) {
    376 				cur_len += curbuf_p->cur_len;
    377 				curbuf_p = curbuf_p->next;
    378 			}
    379 			cur_len += curbuf_p->cur_len;
    380 
    381 			/* How much payload data is left to be filled in? */
    382 			unread_len = pdu_p->hdr.payload_len - cur_len;
    383 
    384 			/* Read as much remaining payload data as possible */
    385 			iovcnt = 0;
    386 			while (curbuf_p->next != NULL) {
    387 				read_buf[iovcnt].iov_base = isns_buffer_data(
    388 			    	    curbuf_p, curbuf_p->cur_len);
    389 				read_buf[iovcnt].iov_len = curbuf_p->alloc_len -
    390 			    	    curbuf_p->cur_len;
    391 				iovcnt++;
    392 
    393 				curbuf_p = curbuf_p->next;
    394 			}
    395 			read_buf[iovcnt].iov_base = isns_buffer_data(curbuf_p,
    396 		    	    curbuf_p->cur_len);
    397 			read_buf[iovcnt].iov_len = unread_len;
    398 			iovcnt++;
    399 
    400 			rv = isns_socket_readv(cfg_p->sd, iovp, iovcnt);
    401 			if ((rv == 0) || (rv == -1)) {
    402 				DBG("isns_kevent_socket: isns_socket_readv(2) "
    403 			    	    "returned %d\n",rv);
    404 				isns_free_pdu(cfg_p->pdu_in_p);
    405 				cfg_p->pdu_in_p = NULL;
    406 
    407 				transport_evt = 1;
    408 				break;
    409 			}
    410 
    411 			/* Update cur_len in buffers that newly have data */
    412 			curbuf_p = pdu_p->payload_p;
    413 			while (curbuf_p->cur_len == curbuf_p->alloc_len)
    414 				curbuf_p = curbuf_p->next;
    415 
    416 			rd_len = (uint32_t)rv;
    417 			do {
    418 				b_len = curbuf_p->alloc_len - curbuf_p->cur_len;
    419 				if (rd_len > b_len) {
    420 					curbuf_p->cur_len = curbuf_p->alloc_len;
    421 					rd_len -= b_len;
    422 				} else {
    423 					curbuf_p->cur_len += rd_len;
    424 					break;
    425 				}
    426 
    427 				curbuf_p = curbuf_p->next;
    428 			} while (curbuf_p != NULL);
    429 
    430 			bavail -= rv;
    431 
    432 			if (rv == (int)unread_len)
    433 				isns_process_in_pdu(cfg_p);
    434 
    435 			more = (bavail > (int64_t)sizeof(struct isns_pdu_hdr_s)) ? 1 : 0;
    436 		}
    437 	}
    438 
    439 	transport_evt |= (evt_p->flags & EV_EOF);
    440 	if (transport_evt) {
    441 		DBG("isns_kevent_socket: processing transport event\n");
    442 
    443 		isns_socket_close(cfg_p->sd);
    444 		cfg_p->sd_connected = 0;
    445 
    446 		if (cfg_p->curtask_p != NULL)
    447 			isns_process_connection_loss(cfg_p);
    448 
    449 		if (cfg_p->pdu_in_p != NULL) {
    450 			isns_free_pdu(cfg_p->pdu_in_p);
    451 			cfg_p->pdu_in_p = NULL;
    452 		}
    453 	}
    454 
    455 	return 0;
    456 }
    457 
    458 /* ARGSUSED */
    459 /*
    460  * isns_kevent_timer_recon()
    461  */
    462 int
    463 isns_kevent_timer_recon(struct kevent *evt_p, struct isns_config_s *cfg_p)
    464 {
    465 	int rv;
    466 
    467 
    468 	DBG("isns_kevent_timer_recon: entered\n");
    469 
    470 	rv = isns_socket_create(&(cfg_p->sd), cfg_p->ai_p->ai_family,
    471 		cfg_p->ai_p->ai_socktype);
    472 	if (rv != 0)
    473 		return 0;
    474 
    475 	rv = isns_socket_connect(cfg_p->sd, cfg_p->ai_p->ai_addr,
    476 	    cfg_p->ai_p->ai_addrlen);
    477 	if (rv == 0) {
    478 		/* Remove ISNS_EVT_TIMER_RECON from kqueue */
    479 		rv = isns_change_kevent_list(cfg_p,
    480 		    (uintptr_t)ISNS_EVT_TIMER_RECON, EVFILT_TIMER, EV_DELETE,
    481 		    (int64_t)0, (intptr_t)0);
    482 		if (rv == -1)
    483 			DBG("isns_kevent_timer_recon: error on "
    484 			    "isns_change_kevent_list(1)\n");
    485 
    486 		cfg_p->sd_connected = 1;
    487 
    488 		/* Add cfg_p->sd to kqueue */
    489 		rv = isns_change_kevent_list(cfg_p, (uintptr_t)cfg_p->sd,
    490 		    EVFILT_READ, EV_ADD | EV_CLEAR, (int64_t)0,
    491 		    (intptr_t)isns_kevent_socket);
    492 		if (rv == -1)
    493 			DBG("isns_kevent_timer_recon: error on "
    494 			    "isns_change_kevent_list(2)\n");
    495 
    496 		isns_end_task(cfg_p->curtask_p);
    497 	}
    498 
    499 	return 0;
    500 }
    501 
    502 
    503 /* ARGSUSED */
    504 /*
    505  * isns_kevent_timer_refresh
    506  */
    507 int
    508 isns_kevent_timer_refresh(struct kevent* evt_p, struct isns_config_s *cfg_p)
    509 {
    510 	struct isns_refresh_s *ref_p;
    511 	ISNS_TRANS trans;
    512 	uint32_t status;
    513 	int rval;
    514 
    515 	DBG("isns_kevent_timer_refresh: entered\n");
    516 
    517 	/* If refresh info pointer NULL, or no name assigned, just return. */
    518 	ref_p = cfg_p->refresh_p;
    519 	if ((ref_p == NULL) || (ref_p->node[0] == '\0'))
    520 	    	return 0;
    521 
    522 	if (ref_p->trans_p != NULL) {
    523 		/* If the previous refresh trans is not complete, return. */
    524 		rval = isns_get_pdu_response_status(ref_p->trans_p, &status);
    525 		if (rval == EPERM) {
    526 			DBG("isns_kevent_timer_refresh: "
    527 			    "prev refresh trans not complete\n");
    528 			return 0;
    529 		}
    530 		/* Free previous refresh trans. */
    531 		isns_free_trans(ref_p->trans_p);
    532 		ref_p->trans_p = NULL;
    533 	}
    534 
    535 	/* Build new refresh transaction and send it. */
    536 	trans = isns_new_trans((ISNS_HANDLE)cfg_p, isnsp_DevAttrQry, 0);
    537 	if (trans == ISNS_INVALID_TRANS) {
    538 		DBG("isns_kevent_timer_refresh: error on isns_new_trans()\n");
    539 		return 0;
    540 	}
    541 
    542 	ref_p->trans_p = (struct isns_trans_s *)trans;
    543 	/* First we add our source attribute */
    544 	isns_add_string(trans, isnst_iSCSIName, ref_p->node);
    545 	/* Now add our message attribute */
    546 	isns_add_string(trans, isnst_iSCSIName, ref_p->node);
    547 	isns_add_tlv(trans, isnst_Delimiter, 0, NULL);
    548 	/* and finally the operating attributes */
    549 	isns_add_tlv(trans, isnst_EID, 0, NULL);
    550 	isns_send_trans(trans, NULL, NULL);
    551 
    552 	return 0;
    553 }
    554