Home | History | Annotate | Line # | Download | only in libzfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 
     22 /*
     23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 
     27 #include <sys/cdefs.h>
     28 /* __FBSDID("$FreeBSD: head/cddl/compat/opensolaris/misc/thread_pool.c 275595 2014-12-08 06:10:47Z delphij $"); */
     29 
     30 #include <stdlib.h>
     31 #include <signal.h>
     32 #include <errno.h>
     33 #include "thread_pool_impl.h"
     34 
     35 typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */
     36 
     37 static void
     38 delete_pool(tpool_t *tpool)
     39 {
     40 	tpool_job_t *job;
     41 
     42 	/*
     43 	 * There should be no pending jobs, but just in case...
     44 	 */
     45 	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
     46 		tpool->tp_head = job->tpj_next;
     47 		free(job);
     48 	}
     49 	(void) pthread_attr_destroy(&tpool->tp_attr);
     50 	free(tpool);
     51 }
     52 
     53 /*
     54  * Worker thread is terminating.
     55  */
     56 static void
     57 worker_cleanup(void *arg)
     58 {
     59 	tpool_t *tpool = arg;
     60 
     61 	if (--tpool->tp_current == 0 &&
     62 	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
     63 		if (tpool->tp_flags & TP_ABANDON) {
     64 			pthread_mutex_unlock(&tpool->tp_mutex);
     65 			delete_pool(tpool);
     66 			return;
     67 		}
     68 		if (tpool->tp_flags & TP_DESTROY)
     69 			(void) pthread_cond_broadcast(&tpool->tp_busycv);
     70 	}
     71 	pthread_mutex_unlock(&tpool->tp_mutex);
     72 }
     73 
     74 static void
     75 notify_waiters(tpool_t *tpool)
     76 {
     77 	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
     78 		tpool->tp_flags &= ~TP_WAIT;
     79 		(void) pthread_cond_broadcast(&tpool->tp_waitcv);
     80 	}
     81 }
     82 
     83 /*
     84  * Called by a worker thread on return from a tpool_dispatch()d job.
     85  */
     86 static void
     87 job_cleanup(void *arg)
     88 {
     89 	tpool_t *tpool = arg;
     90 	pthread_t my_tid = pthread_self();
     91 	tpool_active_t *activep;
     92 	tpool_active_t **activepp;
     93 
     94 	pthread_mutex_lock(&tpool->tp_mutex);
     95 	/* CSTYLED */
     96 	for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
     97 		activep = *activepp;
     98 		if (activep->tpa_tid == my_tid) {
     99 			*activepp = activep->tpa_next;
    100 			break;
    101 		}
    102 	}
    103 	if (tpool->tp_flags & TP_WAIT)
    104 		notify_waiters(tpool);
    105 }
    106 
    107 static void *
    108 tpool_worker(void *arg)
    109 {
    110 	tpool_t *tpool = (tpool_t *)arg;
    111 	int elapsed;
    112 	tpool_job_t *job;
    113 	void (*func)(void *);
    114 	tpool_active_t active;
    115 	sigset_t maskset;
    116 
    117 	pthread_mutex_lock(&tpool->tp_mutex);
    118 	pthread_cleanup_push(worker_cleanup, tpool);
    119 
    120 	/*
    121 	 * This is the worker's main loop.
    122 	 * It will only be left if a timeout or an error has occured.
    123 	 */
    124 	active.tpa_tid = pthread_self();
    125 	for (;;) {
    126 		elapsed = 0;
    127 		tpool->tp_idle++;
    128 		if (tpool->tp_flags & TP_WAIT)
    129 			notify_waiters(tpool);
    130 		while ((tpool->tp_head == NULL ||
    131 		    (tpool->tp_flags & TP_SUSPEND)) &&
    132 		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
    133 			if (tpool->tp_current <= tpool->tp_minimum ||
    134 			    tpool->tp_linger == 0) {
    135 				(void) pthread_cond_wait(&tpool->tp_workcv,
    136 				    &tpool->tp_mutex);
    137 			} else {
    138 				struct timespec timeout;
    139 
    140 				clock_gettime(CLOCK_MONOTONIC, &timeout);
    141 				timeout.tv_sec += tpool->tp_linger;
    142 				if (pthread_cond_timedwait(&tpool->tp_workcv,
    143 				    &tpool->tp_mutex, &timeout) != 0) {
    144 					elapsed = 1;
    145 					break;
    146 				}
    147 			}
    148 		}
    149 		tpool->tp_idle--;
    150 		if (tpool->tp_flags & TP_DESTROY)
    151 			break;
    152 		if (tpool->tp_flags & TP_ABANDON) {
    153 			/* can't abandon a suspended pool */
    154 			if (tpool->tp_flags & TP_SUSPEND) {
    155 				tpool->tp_flags &= ~TP_SUSPEND;
    156 				(void) pthread_cond_broadcast(&tpool->tp_workcv);
    157 			}
    158 			if (tpool->tp_head == NULL)
    159 				break;
    160 		}
    161 		if ((job = tpool->tp_head) != NULL &&
    162 		    !(tpool->tp_flags & TP_SUSPEND)) {
    163 			elapsed = 0;
    164 			func = job->tpj_func;
    165 			arg = job->tpj_arg;
    166 			tpool->tp_head = job->tpj_next;
    167 			if (job == tpool->tp_tail)
    168 				tpool->tp_tail = NULL;
    169 			tpool->tp_njobs--;
    170 			active.tpa_next = tpool->tp_active;
    171 			tpool->tp_active = &active;
    172 			pthread_mutex_unlock(&tpool->tp_mutex);
    173 			pthread_cleanup_push(job_cleanup, tpool);
    174 			free(job);
    175 			/*
    176 			 * Call the specified function.
    177 			 */
    178 			func(arg);
    179 			/*
    180 			 * We don't know what this thread has been doing,
    181 			 * so we reset its signal mask and cancellation
    182 			 * state back to the initial values.
    183 			 */
    184 			sigfillset(&maskset);
    185 			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
    186 			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
    187 			    NULL);
    188 			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
    189 			    NULL);
    190 			pthread_cleanup_pop(1);
    191 		}
    192 		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
    193 			/*
    194 			 * We timed out and there is no work to be done
    195 			 * and the number of workers exceeds the minimum.
    196 			 * Exit now to reduce the size of the pool.
    197 			 */
    198 			break;
    199 		}
    200 	}
    201 	pthread_cleanup_pop(1);
    202 	return (arg);
    203 }
    204 
    205 /*
    206  * Create a worker thread, with all signals blocked.
    207  */
    208 static int
    209 create_worker(tpool_t *tpool)
    210 {
    211 	sigset_t maskset, oset;
    212 	pthread_t thread;
    213 	int error;
    214 
    215 	sigfillset(&maskset);
    216 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
    217 	error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
    218 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
    219 	return (error);
    220 }
    221 
    222 tpool_t	*
    223 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
    224 	pthread_attr_t *attr)
    225 {
    226 	tpool_t	*tpool;
    227 	int error;
    228 
    229 	if (min_threads > max_threads || max_threads < 1) {
    230 		errno = EINVAL;
    231 		return (NULL);
    232 	}
    233 
    234 	tpool = calloc(1, sizeof (*tpool));
    235 	if (tpool == NULL) {
    236 		errno = ENOMEM;
    237 		return (NULL);
    238 	}
    239 	(void) pthread_mutex_init(&tpool->tp_mutex, NULL);
    240 	(void) pthread_cond_init(&tpool->tp_busycv, NULL);
    241 	(void) pthread_cond_init(&tpool->tp_workcv, NULL);
    242 	(void) pthread_cond_init(&tpool->tp_waitcv, NULL);
    243 	tpool->tp_minimum = min_threads;
    244 	tpool->tp_maximum = max_threads;
    245 	tpool->tp_linger = linger;
    246 
    247 	/* make all pool threads be detached daemon threads */
    248 	(void) pthread_attr_init(&tpool->tp_attr);
    249 	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
    250 	    PTHREAD_CREATE_DETACHED);
    251 
    252 	return (tpool);
    253 }
    254 
    255 /*
    256  * Dispatch a work request to the thread pool.
    257  * If there are idle workers, awaken one.
    258  * Else, if the maximum number of workers has
    259  * not been reached, spawn a new worker thread.
    260  * Else just return with the job added to the queue.
    261  */
    262 int
    263 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
    264 {
    265 	tpool_job_t *job;
    266 
    267 	if ((job = calloc(1, sizeof (*job))) == NULL)
    268 		return (-1);
    269 	job->tpj_next = NULL;
    270 	job->tpj_func = func;
    271 	job->tpj_arg = arg;
    272 
    273 	pthread_mutex_lock(&tpool->tp_mutex);
    274 
    275 	if (tpool->tp_head == NULL)
    276 		tpool->tp_head = job;
    277 	else
    278 		tpool->tp_tail->tpj_next = job;
    279 	tpool->tp_tail = job;
    280 	tpool->tp_njobs++;
    281 
    282 	if (!(tpool->tp_flags & TP_SUSPEND)) {
    283 		if (tpool->tp_idle > 0)
    284 			(void) pthread_cond_signal(&tpool->tp_workcv);
    285 		else if (tpool->tp_current < tpool->tp_maximum &&
    286 		    create_worker(tpool) == 0)
    287 			tpool->tp_current++;
    288 	}
    289 
    290 	pthread_mutex_unlock(&tpool->tp_mutex);
    291 	return (0);
    292 }
    293 
    294 /*
    295  * Assumes: by the time tpool_destroy() is called no one will use this
    296  * thread pool in any way and no one will try to dispatch entries to it.
    297  * Calling tpool_destroy() from a job in the pool will cause deadlock.
    298  */
    299 void
    300 tpool_destroy(tpool_t *tpool)
    301 {
    302 	tpool_active_t *activep;
    303 
    304 	pthread_mutex_lock(&tpool->tp_mutex);
    305 	pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
    306 
    307 	/* mark the pool as being destroyed; wakeup idle workers */
    308 	tpool->tp_flags |= TP_DESTROY;
    309 	tpool->tp_flags &= ~TP_SUSPEND;
    310 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
    311 
    312 	/* cancel all active workers */
    313 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
    314 		(void) pthread_cancel(activep->tpa_tid);
    315 
    316 	/* wait for all active workers to finish */
    317 	while (tpool->tp_active != NULL) {
    318 		tpool->tp_flags |= TP_WAIT;
    319 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
    320 	}
    321 
    322 	/* the last worker to terminate will wake us up */
    323 	while (tpool->tp_current != 0)
    324 		(void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
    325 
    326 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
    327 	delete_pool(tpool);
    328 }
    329 
    330 /*
    331  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
    332  * The last worker to terminate will delete the pool.
    333  */
    334 void
    335 tpool_abandon(tpool_t *tpool)
    336 {
    337 
    338 	pthread_mutex_lock(&tpool->tp_mutex);
    339 	if (tpool->tp_current == 0) {
    340 		/* no workers, just delete the pool */
    341 		pthread_mutex_unlock(&tpool->tp_mutex);
    342 		delete_pool(tpool);
    343 	} else {
    344 		/* wake up all workers, last one will delete the pool */
    345 		tpool->tp_flags |= TP_ABANDON;
    346 		tpool->tp_flags &= ~TP_SUSPEND;
    347 		(void) pthread_cond_broadcast(&tpool->tp_workcv);
    348 		pthread_mutex_unlock(&tpool->tp_mutex);
    349 	}
    350 }
    351 
    352 /*
    353  * Wait for all jobs to complete.
    354  * Calling tpool_wait() from a job in the pool will cause deadlock.
    355  */
    356 void
    357 tpool_wait(tpool_t *tpool)
    358 {
    359 
    360 	pthread_mutex_lock(&tpool->tp_mutex);
    361 	pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
    362 	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
    363 		tpool->tp_flags |= TP_WAIT;
    364 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
    365 	}
    366 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
    367 }
    368 
    369 void
    370 tpool_suspend(tpool_t *tpool)
    371 {
    372 
    373 	pthread_mutex_lock(&tpool->tp_mutex);
    374 	tpool->tp_flags |= TP_SUSPEND;
    375 	pthread_mutex_unlock(&tpool->tp_mutex);
    376 }
    377 
    378 int
    379 tpool_suspended(tpool_t *tpool)
    380 {
    381 	int suspended;
    382 
    383 	pthread_mutex_lock(&tpool->tp_mutex);
    384 	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
    385 	pthread_mutex_unlock(&tpool->tp_mutex);
    386 
    387 	return (suspended);
    388 }
    389 
    390 void
    391 tpool_resume(tpool_t *tpool)
    392 {
    393 	int excess;
    394 
    395 	pthread_mutex_lock(&tpool->tp_mutex);
    396 	if (!(tpool->tp_flags & TP_SUSPEND)) {
    397 		pthread_mutex_unlock(&tpool->tp_mutex);
    398 		return;
    399 	}
    400 	tpool->tp_flags &= ~TP_SUSPEND;
    401 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
    402 	excess = tpool->tp_njobs - tpool->tp_idle;
    403 	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
    404 		if (create_worker(tpool) != 0)
    405 			break;		/* pthread_create() failed */
    406 		tpool->tp_current++;
    407 	}
    408 	pthread_mutex_unlock(&tpool->tp_mutex);
    409 }
    410 
    411 int
    412 tpool_member(tpool_t *tpool)
    413 {
    414 	pthread_t my_tid = pthread_self();
    415 	tpool_active_t *activep;
    416 
    417 	pthread_mutex_lock(&tpool->tp_mutex);
    418 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
    419 		if (activep->tpa_tid == my_tid) {
    420 			pthread_mutex_unlock(&tpool->tp_mutex);
    421 			return (1);
    422 		}
    423 	}
    424 	pthread_mutex_unlock(&tpool->tp_mutex);
    425 	return (0);
    426 }
    427