Home | History | Annotate | Line # | Download | only in libisns
      1 /*	$NetBSD: isns_task.c,v 1.1.1.1 2011/01/16 01:22:50 agc Exp $	*/
      2 
      3 /*-
      4  * Copyright (c) 2004,2009 The NetBSD Foundation, Inc.
      5  * All rights reserved.
      6  *
      7  * This code is derived from software contributed to The NetBSD Foundation
      8  * by Wasabi Systems, Inc.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
     20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
     23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29  * POSSIBILITY OF SUCH DAMAGE.
     30  */
     31 
     32 #include <sys/cdefs.h>
     33 __RCSID("$NetBSD: isns_task.c,v 1.1.1.1 2011/01/16 01:22:50 agc Exp $");
     34 
     35 /*
     36  * isns_task.c
     37  */
     38 
     39 #include <sys/types.h>
     40 #include <sys/socket.h>
     41 #include <netinet/in.h>
     42 
     43 #include "isns.h"
     44 #include "isns_config.h"
     45 
     46 static struct iovec write_buf[2 + (ISNS_MAX_PDU_PAYLOAD / ISNS_BUF_SIZE) +
     47     ((ISNS_MAX_PDU_PAYLOAD % ISNS_BUF_SIZE) != 0)];
     48 
     49 static isns_task_handler isns_task_discover_server;
     50 static isns_task_handler isns_task_reconnect_server;
     51 static isns_task_handler isns_task_send_pdu;
     52 static isns_task_handler isns_task_init_socket_io;
     53 static isns_task_handler isns_task_init_refresh;
     54 
     55 
     56 void
     57 isns_run_task(struct isns_task_s *task_p)
     58 {
     59 	static isns_task_handler *task_dispatch_table[ISNS_NUM_TASKS] = {
     60 		isns_task_discover_server,
     61 		isns_task_reconnect_server,
     62 		isns_task_send_pdu,
     63 		isns_task_init_socket_io,
     64 		isns_task_init_refresh
     65 	};
     66 
     67 	DBG("isns_run_task: task_type=%d\n", task_p->task_type);
     68 
     69 	if (task_p->task_type < ARRAY_ELEMS(task_dispatch_table))
     70 		task_dispatch_table[task_p->task_type](task_p);
     71 	else
     72 		DBG("isns_run_task: unknown task type=%d\n", task_p->task_type);
     73 }
     74 
     75 
     76 int
     77 isns_wait_task(struct isns_task_s *task_p, const struct timespec *timeout_p)
     78 {
     79 	struct timeval tv_now;
     80 	struct timespec ts_abstime;
     81 	int rval;
     82 
     83 	DBG("isns_wait_task: waitable=%d\n", task_p->waitable);
     84 
     85 	if (!task_p->waitable)
     86 		return EPERM;
     87 
     88 	pthread_mutex_lock(&task_p->wait_mutex);
     89 
     90 	if (timeout_p == NULL) {
     91 		rval = pthread_cond_wait(&task_p->wait_condvar,
     92 		    &task_p->wait_mutex);
     93 	} else {
     94 		gettimeofday(&tv_now, NULL);
     95 		TIMEVAL_TO_TIMESPEC(&tv_now, &ts_abstime);
     96 		timespecadd(&ts_abstime, timeout_p, &ts_abstime);
     97 
     98 		rval = pthread_cond_timedwait(&task_p->wait_condvar,
     99 		    &task_p->wait_mutex, &ts_abstime);
    100 	}
    101 
    102 	pthread_mutex_unlock(&task_p->wait_mutex);
    103 
    104 	isns_free_task(task_p);
    105 
    106 	DBG("isns_wait_task: wait done (rval=%d)\n", rval);
    107 
    108 	return rval;
    109 }
    110 
    111 
    112 void
    113 isns_end_task(struct isns_task_s *task_p)
    114 {
    115 	DBG("isns_end_task: %p\n", task_p);
    116 	if (task_p == task_p->cfg_p->curtask_p)
    117 		task_p->cfg_p->curtask_p = NULL;
    118 
    119 	if (task_p->waitable)
    120 		pthread_cond_signal(&task_p->wait_condvar);
    121 
    122 	isns_free_task(task_p);
    123 }
    124 
    125 
    126 static void
    127 isns_task_discover_server(struct isns_task_s *task_p)
    128 {
    129 	/* discover server here */
    130 	DBG("isns_task_discover_server: entered\n");
    131 
    132 	isns_end_task(task_p);
    133 }
    134 
    135 
    136 /*
    137  * isns_task_reconnect_server()
    138  */
    139 static void
    140 isns_task_reconnect_server(struct isns_task_s *task_p)
    141 {
    142 	struct addrinfo *ai_p;
    143 	int rv;
    144 
    145 
    146 	DBG("isns_task_reconnect_server: entered\n");
    147 
    148 	ai_p = task_p->var.reconnect_server.ai_p;
    149 
    150 	rv = isns_socket_create(&(task_p->cfg_p->sd), ai_p->ai_family,
    151 	    ai_p->ai_socktype);
    152 	if (rv != 0)
    153 		return;
    154 
    155 	rv = isns_socket_connect(task_p->cfg_p->sd, ai_p->ai_addr,
    156 	    ai_p->ai_addrlen);
    157 	if (rv != 0) {
    158 		/* Add ISNS_EVT_TIMER_RECON to kqueue */
    159 		rv = isns_change_kevent_list(task_p->cfg_p,
    160 		    (uintptr_t)ISNS_EVT_TIMER_RECON, EVFILT_TIMER, EV_ADD,
    161 		    (int64_t)ISNS_EVT_TIMER_RECON_PERIOD_MS,
    162 		    (intptr_t)isns_kevent_timer_recon);
    163 		if (rv == -1)
    164 			DBG("isns_task_reconnect_server: error on "
    165 			    "isns_change_kevent_list(1)\n");
    166 	} else {
    167 		task_p->cfg_p->sd_connected = 1;
    168 
    169 		/* Add cfg_p->sd to kqueue */
    170 		rv = isns_change_kevent_list(task_p->cfg_p,
    171 		    (uintptr_t)(task_p->cfg_p->sd), EVFILT_READ,
    172 		    EV_ADD | EV_CLEAR, (int64_t)0,
    173 		    (intptr_t)isns_kevent_socket);
    174 		if (rv == -1)
    175 			DBG("isns_task_reconnect_server: error on "
    176 			    "isns_change_kevent_lists(2)\n");
    177 
    178 		isns_end_task(task_p);
    179 	}
    180 }
    181 
    182 /*
    183  * isns_task_send_pdu()
    184  *
    185  * We send all of the pdu's associated with transaction task_p->trans_p here.
    186  *
    187  * Assumptions:
    188  *	(1) task_p->trans_p->pdu_req_list is an ordered (seq_id) list of
    189  *	    related (trans_id), appropriately sized pdus to be sent. The first
    190  *	    pdu has flag ISNS_FLAG_FIRST_PDU set and the last pdu has flag
    191  *	    ISNS_FLAG_LAST_PDU set.
    192  */
    193 static void
    194 isns_task_send_pdu(struct isns_task_s *task_p)
    195 {
    196 	struct iovec *iovp;
    197 	struct isns_config_s *cfg_p;
    198 	struct isns_pdu_s *pdu_p; /* points to first pdu in pdu_req_list */
    199 	struct isns_buffer_s *buf_p;
    200 	ssize_t bytes_written;
    201 	ssize_t count;
    202 	size_t bytes_to_write;
    203 	int iovcnt, cur_iovec;
    204 	char *ptr;
    205 
    206 
    207 	DBG("isns_task_send_pdu: entered\n");
    208 
    209 	cfg_p = task_p->cfg_p;
    210 	pdu_p = task_p->var.send_pdu.pdu_p;
    211 
    212 	while (pdu_p != NULL) {
    213 		/* adjust byte order if necessary */
    214 		if (pdu_p->byteorder_host) {
    215 			pdu_p->hdr.isnsp_version = isns_htons(pdu_p->hdr.
    216 			    isnsp_version);
    217 			pdu_p->hdr.func_id = isns_htons(pdu_p->hdr.func_id);
    218 			pdu_p->hdr.payload_len = isns_htons(pdu_p->hdr.
    219 			    payload_len);
    220 			pdu_p->hdr.flags = isns_htons(pdu_p->hdr.flags);
    221 			pdu_p->hdr.trans_id = isns_htons(pdu_p->hdr.trans_id);
    222 			pdu_p->hdr.seq_id = isns_htons(pdu_p->hdr.seq_id);
    223 
    224 			pdu_p->byteorder_host = 0;
    225 		}
    226 		DUMP_PDU(pdu_p);
    227 
    228 		/* send PDU via socket here */
    229 		write_buf[0].iov_base = &(pdu_p->hdr);
    230 		write_buf[0].iov_len = sizeof(pdu_p->hdr);
    231 		bytes_to_write = write_buf[0].iov_len;
    232 		iovcnt = 1;
    233 
    234 		buf_p = pdu_p->payload_p;
    235 		while (buf_p != NULL) {
    236 			write_buf[iovcnt].iov_base = isns_buffer_data(buf_p,0);
    237 			write_buf[iovcnt].iov_len = buf_p->cur_len;
    238 			bytes_to_write += write_buf[iovcnt].iov_len;
    239 			iovcnt++;
    240 			buf_p = buf_p->next;
    241 		}
    242 
    243 		/* iovcnt and bytes_to_write are initialized */
    244 		cur_iovec = 0;
    245 		buf_p = ((struct isns_buffer_s *)(void *)pdu_p) - 1;
    246 		do {
    247 			iovp = &(write_buf[cur_iovec]);
    248 			bytes_written = isns_socket_writev(cfg_p->sd, iovp,
    249 			    iovcnt);
    250 			if (bytes_written == -1) {
    251 				DBG("isns_task_send_pdu: error on "
    252 			    	"isns_socket_writev\n");
    253 				isns_socket_close(cfg_p->sd);
    254 				cfg_p->sd_connected = 0;
    255 
    256 				isns_process_connection_loss(cfg_p);
    257 
    258 				if (cfg_p->pdu_in_p != NULL) {
    259 					isns_free_pdu(cfg_p->pdu_in_p);
    260 					cfg_p->pdu_in_p = NULL;
    261 				}
    262 
    263 				break;
    264 			}
    265 
    266 			if (bytes_written < (ssize_t)bytes_to_write) {
    267 				count = bytes_written;
    268 				while (buf_p != NULL) { /* -OR- while (1) */
    269 					if ((unsigned)count >= write_buf[
    270 					    cur_iovec].iov_len) {
    271 						count -= write_buf[cur_iovec].
    272 						    iov_len;
    273 						if (cur_iovec == 0)
    274 							buf_p = pdu_p->
    275 							    payload_p;
    276 						else
    277 							buf_p = buf_p->next;
    278 						cur_iovec++;
    279 						iovcnt--;
    280 
    281 						if (count == 0) {
    282 							/* Do another write */
    283 							break;
    284 						} else {
    285 							/* Look at new iovec */
    286 							continue;
    287 						}
    288 					} else {
    289 						write_buf[cur_iovec].iov_len -=
    290 						    count;
    291 
    292 						ptr = (char *) write_buf[cur_iovec].iov_base;
    293 						ptr += count;
    294 						write_buf[cur_iovec].iov_base = ptr;
    295 
    296 						/* Do another write */
    297 						break;
    298 					}
    299 				}
    300 			}
    301 
    302 			bytes_to_write -= bytes_written;
    303 		} while (bytes_to_write);
    304 
    305 		pdu_p = pdu_p->next;
    306 	}
    307 
    308 	if (!task_p->waitable) {
    309 		isns_complete_trans(task_p->var.send_pdu.trans_p);
    310 		isns_end_task(task_p);
    311 	}
    312 }
    313 
    314 /*
    315  * isns_task_init_socket_io()
    316  */
    317 static void
    318 isns_task_init_socket_io(struct isns_task_s *task_p)
    319 {
    320 	struct isns_config_s *cfg_p;
    321 	int rv;
    322 
    323 
    324 	DBG("isns_task_init_socket_io: entered\n");
    325 
    326 	cfg_p = task_p->cfg_p;
    327 
    328 	if (cfg_p->sd_connected) {
    329 		isns_socket_close(cfg_p->sd);
    330 		cfg_p->sd_connected = 0;
    331 
    332 		/* We may have received part of an unsolicited/duplicate pdu */
    333 		if (cfg_p->pdu_in_p != NULL) {
    334 			isns_free_pdu(cfg_p->pdu_in_p);
    335 			cfg_p->pdu_in_p = NULL;
    336 		}
    337 	}
    338 
    339 	/* May have an allocated 'struct addrinfo', whether connected or not */
    340 	if (cfg_p->ai_p != NULL) {
    341 		isns_free(cfg_p->ai_p);
    342 		cfg_p->ai_p = NULL;
    343 	}
    344 
    345 	cfg_p->sd = task_p->var.init_socket_io.sd;
    346 	cfg_p->ai_p = task_p->var.init_socket_io.ai_p;
    347 
    348 	cfg_p->sd_connected = 1;
    349 
    350 	/* Add cfg_p->sd to kqueue */
    351 	rv = isns_change_kevent_list(cfg_p, (uintptr_t)cfg_p->sd,
    352 	    EVFILT_READ, EV_ADD | EV_CLEAR, (int64_t)0,
    353 	    (intptr_t)isns_kevent_socket);
    354 	if (rv == -1)
    355 		DBG("isns_task_init_socket_io: error on "
    356 		    "isns_change_kevent_list\n");
    357 
    358 	isns_end_task(task_p);
    359 }
    360 
    361 
    362 /*
    363  * isns_task_init_refresh(struct isns_task_s *task_p)
    364  */
    365 static void
    366 isns_task_init_refresh(struct isns_task_s *task_p)
    367 {
    368 	struct isns_config_s *cfg_p;
    369 	int rval;
    370 
    371 	DBG("isns_task_init_refresh: entered\n");
    372 
    373 	/* Free any previous refresh info. */
    374 	cfg_p = task_p->cfg_p;
    375 	if (cfg_p->refresh_p != NULL) {
    376 		if (cfg_p->refresh_p->trans_p != NULL)
    377 			isns_free_trans(cfg_p->refresh_p->trans_p);
    378 		isns_free(cfg_p->refresh_p);
    379 	}
    380 
    381 	/* Assign new refresh info into config struct. */
    382 	cfg_p->refresh_p = task_p->var.init_refresh.ref_p;
    383 	cfg_p->refresh_p->trans_p = NULL;
    384 
    385 	/* Setup (or change) kevent timer for reg refresh. */
    386 	rval = isns_change_kevent_list(cfg_p,
    387 	    (uintptr_t)ISNS_EVT_TIMER_REFRESH, EVFILT_TIMER,
    388 	    EV_ADD | EV_ENABLE, (int64_t)cfg_p->refresh_p->interval * 1000,
    389 	    (intptr_t)isns_kevent_timer_refresh);
    390 	if (rval == -1) {
    391 		DBG("isns_task_init_refresh: "
    392 		    "error on isns_change_kevent_list()\n");
    393 	}
    394 
    395 	isns_end_task(task_p);
    396 }
    397 
    398 
    399 struct isns_task_s *
    400 isns_new_task(struct isns_config_s *cfg_p, uint8_t task_type, int waitable)
    401 {
    402 	struct isns_buffer_s *buf_p;
    403 	struct isns_task_s *task_p;
    404 	pthread_mutexattr_t mutexattr;
    405 	pthread_condattr_t condattr;
    406 
    407 	task_p = NULL;
    408 	buf_p = isns_new_buffer((int)sizeof(struct isns_task_s));
    409 	if (buf_p) {
    410 		task_p = (struct isns_task_s *)isns_buffer_data(buf_p, 0);
    411 		task_p->cfg_p = cfg_p;
    412 		task_p->task_type = task_type;
    413 		task_p->waitable = waitable;
    414 
    415 		if (waitable) {
    416 			pthread_mutexattr_init(&mutexattr);
    417 			pthread_mutexattr_settype(&mutexattr,
    418 			    ISNS_MUTEX_TYPE_NORMAL);
    419 			pthread_mutex_init(&task_p->wait_mutex, &mutexattr);
    420 
    421 			pthread_condattr_init(&condattr);
    422 			pthread_cond_init(&task_p->wait_condvar, &condattr);
    423 			task_p->wait_ref_count = 2;
    424 		}
    425 	}
    426 
    427 	DBG("isns_new_task: %p, waitable=%d\n", task_p, waitable);
    428 
    429 	return task_p;
    430 }
    431 
    432 
    433 void
    434 isns_free_task(struct isns_task_s *task_p)
    435 {
    436 	struct isns_buffer_s *buf_p;
    437 	int ref_count;
    438 
    439 	DBG("isns_free_task: %p\n", task_p);
    440 	if (task_p->waitable) {
    441 		pthread_mutex_lock(&task_p->wait_mutex);
    442 		ref_count = --task_p->wait_ref_count;
    443 		pthread_mutex_unlock(&task_p->wait_mutex);
    444 
    445 		if (ref_count > 0) {
    446 			DBG("isns_free_task: ref_count > 0, no free done\n");
    447 			return;
    448 		}
    449 
    450 		pthread_mutex_destroy(&task_p->wait_mutex);
    451 		pthread_cond_destroy(&task_p->wait_condvar);
    452 	}
    453 	buf_p = ((struct isns_buffer_s *)(void *)(task_p))-1;
    454 	isns_free_buffer(buf_p);
    455 }
    456 
    457 
    458 void
    459 isns_taskq_insert_head(struct isns_config_s *cfg_p,
    460     struct isns_task_s *task_p)
    461 {
    462 	pthread_mutex_lock(&cfg_p->taskq_mutex);
    463 	SIMPLEQ_INSERT_HEAD(&cfg_p->taskq_head, task_p, taskq_entry);
    464 	pthread_mutex_unlock(&cfg_p->taskq_mutex);
    465 
    466 	DBG("isns_taskq_insert_head: %p\n", task_p);
    467 }
    468 
    469 
    470 void
    471 isns_taskq_insert_tail(struct isns_config_s *cfg_p,
    472     struct isns_task_s *task_p)
    473 {
    474 	pthread_mutex_lock(&cfg_p->taskq_mutex);
    475 	SIMPLEQ_INSERT_TAIL(&cfg_p->taskq_head, task_p, taskq_entry);
    476 	pthread_mutex_unlock(&cfg_p->taskq_mutex);
    477 
    478 	DBG("isns_taskq_insert_tail: %p\n", task_p);
    479 }
    480 
    481 
    482 struct isns_task_s *
    483 isns_taskq_remove(struct isns_config_s *cfg_p)
    484 {
    485 	struct isns_task_s *task_p = NULL;
    486 
    487 	pthread_mutex_lock(&cfg_p->taskq_mutex);
    488 	if ((task_p = SIMPLEQ_FIRST(&cfg_p->taskq_head)) != NULL)
    489 		SIMPLEQ_REMOVE_HEAD(&cfg_p->taskq_head, taskq_entry);
    490 	pthread_mutex_unlock(&cfg_p->taskq_mutex);
    491 
    492 	DBG("isns_taskq_remove: %p\n", task_p);
    493 
    494 	return task_p;
    495 }
    496 
    497 
    498 struct isns_task_s *
    499 isns_taskq_remove_trans(struct isns_config_s *cfg_p, uint16_t trans_id)
    500 {
    501 	struct isns_task_s *task_p;
    502 	int trans_found;
    503 
    504 	trans_found = 0;
    505 	pthread_mutex_lock(&cfg_p->taskq_mutex);
    506 	SIMPLEQ_FOREACH(task_p, &cfg_p->taskq_head, taskq_entry) {
    507 		if ((task_p->task_type == ISNS_TASK_SEND_PDU)
    508 		    && (task_p->var.send_pdu.trans_p->id == trans_id)) {
    509 			trans_found = 1;
    510 			break;
    511 		}
    512 	}
    513 	if (trans_found) {
    514 		SIMPLEQ_REMOVE(&cfg_p->taskq_head, task_p, isns_task_s,
    515 		    taskq_entry);
    516 	}
    517 	pthread_mutex_unlock(&cfg_p->taskq_mutex);
    518 
    519 	return (trans_found ? task_p : NULL);
    520 }
    521