Home | History | Annotate | Line # | Download | only in kern
kern_threadpool.c revision 1.3.2.3
      1 /*	$NetBSD: kern_threadpool.c,v 1.3.2.3 2019/01/18 08:50:57 pgoyette Exp $	*/
      2 
      3 /*-
      4  * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
      5  * All rights reserved.
      6  *
      7  * This code is derived from software contributed to The NetBSD Foundation
      8  * by Taylor R. Campbell and Jason R. Thorpe.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
     20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
     23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29  * POSSIBILITY OF SUCH DAMAGE.
     30  */
     31 
     32 /*
     33  * Thread pools.
     34  *
     35  * A thread pool is a collection of worker threads idle or running
     36  * jobs, together with an overseer thread that does not run jobs but
     37  * can be given jobs to assign to a worker thread.  Scheduling a job in
     38  * a thread pool does not allocate or even sleep at all, except perhaps
     39  * on an adaptive lock, unlike kthread_create.  Jobs reuse threads, so
     40  * they do not incur the expense of creating and destroying kthreads
     41  * unless there is not much work to be done.
     42  *
     43  * A per-CPU thread pool (threadpool_percpu) is a collection of thread
     44  * pools, one per CPU bound to that CPU.  For each priority level in
     45  * use, there is one shared unbound thread pool (i.e., pool of threads
     46  * not bound to any CPU) and one shared per-CPU thread pool.
     47  *
     48  * To use the unbound thread pool at priority pri, call
     49  * threadpool_get(&pool, pri).  When you're done, call
     50  * threadpool_put(pool, pri).
     51  *
     52  * To use the per-CPU thread pools at priority pri, call
     53  * threadpool_percpu_get(&pool_percpu, pri), and then use the thread
     54  * pool returned by threadpool_percpu_ref(pool_percpu) for the current
     55  * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another
     56  * CPU.  When you're done, call threadpool_percpu_put(pool_percpu,
     57  * pri).
     58  *
     59  * +--MACHINE-----------------------------------------------+
     60  * | +--CPU 0-------+ +--CPU 1-------+     +--CPU n-------+ |
     61  * | | <overseer 0> | | <overseer 1> | ... | <overseer n> | |
     62  * | | <idle 0a>    | | <running 1a> | ... | <idle na>    | |
     63  * | | <running 0b> | | <running 1b> | ... | <idle nb>    | |
     64  * | | .            | | .            | ... | .            | |
     65  * | | .            | | .            | ... | .            | |
     66  * | | .            | | .            | ... | .            | |
     67  * | +--------------+ +--------------+     +--------------+ |
     68  * |            +--unbound---------+                        |
     69  * |            | <overseer n+1>   |                        |
     70  * |            | <idle (n+1)a>    |                        |
     71  * |            | <running (n+1)b> |                        |
     72  * |            +------------------+                        |
     73  * +--------------------------------------------------------+
     74  *
     75  * XXX Why one overseer per CPU?  I did that originally to avoid
     76  * touching remote CPUs' memory when scheduling a job, but that still
     77  * requires interprocessor synchronization.  Perhaps we could get by
     78  * with a single overseer thread, at the expense of another pointer in
     79  * struct threadpool_job to identify the CPU on which it must run
     80  * in order for the overseer to schedule it correctly.
     81  */
     82 
     83 #include <sys/cdefs.h>
     84 __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.3.2.3 2019/01/18 08:50:57 pgoyette Exp $");
     85 
     86 #include <sys/types.h>
     87 #include <sys/param.h>
     88 #include <sys/atomic.h>
     89 #include <sys/condvar.h>
     90 #include <sys/cpu.h>
     91 #include <sys/kernel.h>
     92 #include <sys/kmem.h>
     93 #include <sys/kthread.h>
     94 #include <sys/mutex.h>
     95 #include <sys/once.h>
     96 #include <sys/percpu.h>
     97 #include <sys/pool.h>
     98 #include <sys/proc.h>
     99 #include <sys/queue.h>
    100 #include <sys/systm.h>
    101 #include <sys/sysctl.h>
    102 #include <sys/threadpool.h>
    103 
    104 /* Data structures */
    105 
    106 TAILQ_HEAD(job_head, threadpool_job);
    107 TAILQ_HEAD(thread_head, threadpool_thread);
    108 
    109 struct threadpool_thread {
    110 	struct lwp			*tpt_lwp;
    111 	char				*tpt_lwp_savedname;
    112 	struct threadpool		*tpt_pool;
    113 	struct threadpool_job		*tpt_job;
    114 	kcondvar_t			tpt_cv;
    115 	TAILQ_ENTRY(threadpool_thread)	tpt_entry;
    116 };
    117 
    118 struct threadpool {
    119 	kmutex_t			tp_lock;
    120 	struct threadpool_thread	tp_overseer;
    121 	struct job_head			tp_jobs;
    122 	struct thread_head		tp_idle_threads;
    123 	uint64_t			tp_refcnt;
    124 	int				tp_flags;
    125 #define	THREADPOOL_DYING	0x01
    126 	struct cpu_info			*tp_cpu;
    127 	pri_t				tp_pri;
    128 };
    129 
    130 static void	threadpool_hold(struct threadpool *);
    131 static void	threadpool_rele(struct threadpool *);
    132 
    133 static int	threadpool_percpu_create(struct threadpool_percpu **, pri_t);
    134 static void	threadpool_percpu_destroy(struct threadpool_percpu *);
    135 
    136 static threadpool_job_fn_t threadpool_job_dead;
    137 
    138 static void	threadpool_job_hold(struct threadpool_job *);
    139 static void	threadpool_job_rele(struct threadpool_job *);
    140 
    141 static void	threadpool_overseer_thread(void *) __dead;
    142 static void	threadpool_thread(void *) __dead;
    143 
    144 static pool_cache_t	threadpool_thread_pc __read_mostly;
    145 
    146 static kmutex_t		threadpools_lock __cacheline_aligned;
    147 
    148 	/* Default to 30 second idle timeout for pool threads. */
    149 static int	threadpool_idle_time_ms = 30 * 1000;
    150 
    151 struct threadpool_unbound {
    152 	struct threadpool		tpu_pool;
    153 
    154 	/* protected by threadpools_lock */
    155 	LIST_ENTRY(threadpool_unbound)	tpu_link;
    156 	uint64_t			tpu_refcnt;
    157 };
    158 
    159 static LIST_HEAD(, threadpool_unbound) unbound_threadpools;
    160 
    161 static struct threadpool_unbound *
    162 threadpool_lookup_unbound(pri_t pri)
    163 {
    164 	struct threadpool_unbound *tpu;
    165 
    166 	LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) {
    167 		if (tpu->tpu_pool.tp_pri == pri)
    168 			return tpu;
    169 	}
    170 	return NULL;
    171 }
    172 
    173 static void
    174 threadpool_insert_unbound(struct threadpool_unbound *tpu)
    175 {
    176 	KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL);
    177 	LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link);
    178 }
    179 
    180 static void
    181 threadpool_remove_unbound(struct threadpool_unbound *tpu)
    182 {
    183 	KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu);
    184 	LIST_REMOVE(tpu, tpu_link);
    185 }
    186 
    187 struct threadpool_percpu {
    188 	percpu_t *			tpp_percpu;
    189 	pri_t				tpp_pri;
    190 
    191 	/* protected by threadpools_lock */
    192 	LIST_ENTRY(threadpool_percpu)	tpp_link;
    193 	uint64_t			tpp_refcnt;
    194 };
    195 
    196 static LIST_HEAD(, threadpool_percpu) percpu_threadpools;
    197 
    198 static struct threadpool_percpu *
    199 threadpool_lookup_percpu(pri_t pri)
    200 {
    201 	struct threadpool_percpu *tpp;
    202 
    203 	LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) {
    204 		if (tpp->tpp_pri == pri)
    205 			return tpp;
    206 	}
    207 	return NULL;
    208 }
    209 
    210 static void
    211 threadpool_insert_percpu(struct threadpool_percpu *tpp)
    212 {
    213 	KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL);
    214 	LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link);
    215 }
    216 
    217 static void
    218 threadpool_remove_percpu(struct threadpool_percpu *tpp)
    219 {
    220 	KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp);
    221 	LIST_REMOVE(tpp, tpp_link);
    222 }
    223 
    224 #ifdef THREADPOOL_VERBOSE
    225 #define	TP_LOG(x)		printf x
    226 #else
    227 #define	TP_LOG(x)		/* nothing */
    228 #endif /* THREADPOOL_VERBOSE */
    229 
    230 static int
    231 sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS)
    232 {
    233 	struct sysctlnode node;
    234 	int val, error;
    235 
    236 	node = *rnode;
    237 
    238 	val = threadpool_idle_time_ms;
    239 	node.sysctl_data = &val;
    240 	error = sysctl_lookup(SYSCTLFN_CALL(&node));
    241 	if (error == 0 && newp != NULL) {
    242 		/* Disallow negative values and 0 (forever). */
    243 		if (val < 1)
    244 			error = EINVAL;
    245 		else
    246 			threadpool_idle_time_ms = val;
    247 	}
    248 
    249 	return error;
    250 }
    251 
    252 SYSCTL_SETUP_PROTO(sysctl_threadpool_setup);
    253 
    254 SYSCTL_SETUP(sysctl_threadpool_setup,
    255     "sysctl kern.threadpool subtree setup")
    256 {
    257 	const struct sysctlnode *rnode, *cnode;
    258 	int error __diagused;
    259 
    260 	error = sysctl_createv(clog, 0, NULL, &rnode,
    261 	    CTLFLAG_PERMANENT,
    262 	    CTLTYPE_NODE, "threadpool",
    263 	    SYSCTL_DESCR("threadpool subsystem options"),
    264 	    NULL, 0, NULL, 0,
    265 	    CTL_KERN, CTL_CREATE, CTL_EOL);
    266 	KASSERT(error == 0);
    267 
    268 	error = sysctl_createv(clog, 0, &rnode, &cnode,
    269 	    CTLFLAG_PERMANENT | CTLFLAG_READWRITE,
    270 	    CTLTYPE_INT, "idle_ms",
    271 	    SYSCTL_DESCR("idle thread timeout in ms"),
    272 	    sysctl_kern_threadpool_idle_ms, 0, NULL, 0,
    273 	    CTL_CREATE, CTL_EOL);
    274 	KASSERT(error == 0);
    275 }
    276 
    277 void
    278 threadpools_init(void)
    279 {
    280 
    281 	threadpool_thread_pc =
    282 	    pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0,
    283 		"thplthrd", NULL, IPL_NONE, NULL, NULL, NULL);
    284 
    285 	LIST_INIT(&unbound_threadpools);
    286 	LIST_INIT(&percpu_threadpools);
    287 	mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE);
    288 }
    289 
    290 /* Thread pool creation */
    291 
    292 static bool
    293 threadpool_pri_is_valid(pri_t pri)
    294 {
    295 	return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT));
    296 }
    297 
    298 static int
    299 threadpool_create(struct threadpool *const pool, struct cpu_info *ci,
    300     pri_t pri)
    301 {
    302 	struct lwp *lwp;
    303 	int ktflags;
    304 	int error;
    305 
    306 	KASSERT(threadpool_pri_is_valid(pri));
    307 
    308 	mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM);
    309 	/* XXX overseer */
    310 	TAILQ_INIT(&pool->tp_jobs);
    311 	TAILQ_INIT(&pool->tp_idle_threads);
    312 	pool->tp_refcnt = 1;		/* overseer's reference */
    313 	pool->tp_flags = 0;
    314 	pool->tp_cpu = ci;
    315 	pool->tp_pri = pri;
    316 
    317 	pool->tp_overseer.tpt_lwp = NULL;
    318 	pool->tp_overseer.tpt_pool = pool;
    319 	pool->tp_overseer.tpt_job = NULL;
    320 	cv_init(&pool->tp_overseer.tpt_cv, "poolover");
    321 
    322 	ktflags = 0;
    323 	ktflags |= KTHREAD_MPSAFE;
    324 	if (pri < PRI_KERNEL)
    325 		ktflags |= KTHREAD_TS;
    326 	error = kthread_create(pri, ktflags, ci, &threadpool_overseer_thread,
    327 	    &pool->tp_overseer, &lwp,
    328 	    "pooloverseer/%d@%d", (ci ? cpu_index(ci) : -1), (int)pri);
    329 	if (error)
    330 		goto fail0;
    331 
    332 	mutex_spin_enter(&pool->tp_lock);
    333 	pool->tp_overseer.tpt_lwp = lwp;
    334 	cv_broadcast(&pool->tp_overseer.tpt_cv);
    335 	mutex_spin_exit(&pool->tp_lock);
    336 
    337 	return 0;
    338 
    339 fail0:	KASSERT(error);
    340 	KASSERT(pool->tp_overseer.tpt_job == NULL);
    341 	KASSERT(pool->tp_overseer.tpt_pool == pool);
    342 	KASSERT(pool->tp_flags == 0);
    343 	KASSERT(pool->tp_refcnt == 0);
    344 	KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
    345 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
    346 	KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
    347 	cv_destroy(&pool->tp_overseer.tpt_cv);
    348 	mutex_destroy(&pool->tp_lock);
    349 	return error;
    350 }
    351 
    352 /* Thread pool destruction */
    353 
    354 static void
    355 threadpool_destroy(struct threadpool *pool)
    356 {
    357 	struct threadpool_thread *thread;
    358 
    359 	/* Mark the pool dying and wait for threads to commit suicide.  */
    360 	mutex_spin_enter(&pool->tp_lock);
    361 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
    362 	pool->tp_flags |= THREADPOOL_DYING;
    363 	cv_broadcast(&pool->tp_overseer.tpt_cv);
    364 	TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry)
    365 		cv_broadcast(&thread->tpt_cv);
    366 	while (0 < pool->tp_refcnt) {
    367 		TP_LOG(("%s: draining %" PRIu64 " references...\n", __func__,
    368 		    pool->tp_refcnt));
    369 		cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock);
    370 	}
    371 	mutex_spin_exit(&pool->tp_lock);
    372 
    373 	KASSERT(pool->tp_overseer.tpt_job == NULL);
    374 	KASSERT(pool->tp_overseer.tpt_pool == pool);
    375 	KASSERT(pool->tp_flags == THREADPOOL_DYING);
    376 	KASSERT(pool->tp_refcnt == 0);
    377 	KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
    378 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
    379 	KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
    380 	cv_destroy(&pool->tp_overseer.tpt_cv);
    381 	mutex_destroy(&pool->tp_lock);
    382 }
    383 
    384 static void
    385 threadpool_hold(struct threadpool *pool)
    386 {
    387 
    388 	KASSERT(mutex_owned(&pool->tp_lock));
    389 	pool->tp_refcnt++;
    390 	KASSERT(pool->tp_refcnt != 0);
    391 }
    392 
    393 static void
    394 threadpool_rele(struct threadpool *pool)
    395 {
    396 
    397 	KASSERT(mutex_owned(&pool->tp_lock));
    398 	KASSERT(0 < pool->tp_refcnt);
    399 	if (--pool->tp_refcnt == 0)
    400 		cv_broadcast(&pool->tp_overseer.tpt_cv);
    401 }
    402 
    403 /* Unbound thread pools */
    404 
    405 int
    406 threadpool_get(struct threadpool **poolp, pri_t pri)
    407 {
    408 	struct threadpool_unbound *tpu, *tmp = NULL;
    409 	int error;
    410 
    411 	ASSERT_SLEEPABLE();
    412 
    413 	if (! threadpool_pri_is_valid(pri))
    414 		return EINVAL;
    415 
    416 	mutex_enter(&threadpools_lock);
    417 	tpu = threadpool_lookup_unbound(pri);
    418 	if (tpu == NULL) {
    419 		mutex_exit(&threadpools_lock);
    420 		TP_LOG(("%s: No pool for pri=%d, creating one.\n",
    421 		    __func__, (int)pri));
    422 		tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP);
    423 		error = threadpool_create(&tmp->tpu_pool, NULL, pri);
    424 		if (error) {
    425 			kmem_free(tmp, sizeof(*tmp));
    426 			return error;
    427 		}
    428 		mutex_enter(&threadpools_lock);
    429 		tpu = threadpool_lookup_unbound(pri);
    430 		if (tpu == NULL) {
    431 			TP_LOG(("%s: Won the creation race for pri=%d.\n",
    432 			    __func__, (int)pri));
    433 			tpu = tmp;
    434 			tmp = NULL;
    435 			threadpool_insert_unbound(tpu);
    436 		}
    437 	}
    438 	KASSERT(tpu != NULL);
    439 	tpu->tpu_refcnt++;
    440 	KASSERT(tpu->tpu_refcnt != 0);
    441 	mutex_exit(&threadpools_lock);
    442 
    443 	if (tmp != NULL) {
    444 		threadpool_destroy(&tmp->tpu_pool);
    445 		kmem_free(tmp, sizeof(*tmp));
    446 	}
    447 	KASSERT(tpu != NULL);
    448 	*poolp = &tpu->tpu_pool;
    449 	return 0;
    450 }
    451 
    452 void
    453 threadpool_put(struct threadpool *pool, pri_t pri)
    454 {
    455 	struct threadpool_unbound *tpu =
    456 	    container_of(pool, struct threadpool_unbound, tpu_pool);
    457 
    458 	ASSERT_SLEEPABLE();
    459 
    460 	KASSERT(threadpool_pri_is_valid(pri));
    461 
    462 	mutex_enter(&threadpools_lock);
    463 	KASSERT(tpu == threadpool_lookup_unbound(pri));
    464 	KASSERT(0 < tpu->tpu_refcnt);
    465 	if (--tpu->tpu_refcnt == 0) {
    466 		TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
    467 		    __func__, (int)pri));
    468 		threadpool_remove_unbound(tpu);
    469 	} else {
    470 		tpu = NULL;
    471 	}
    472 	mutex_exit(&threadpools_lock);
    473 
    474 	if (tpu) {
    475 		threadpool_destroy(&tpu->tpu_pool);
    476 		kmem_free(tpu, sizeof(*tpu));
    477 	}
    478 }
    479 
    480 /* Per-CPU thread pools */
    481 
    482 int
    483 threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri)
    484 {
    485 	struct threadpool_percpu *pool_percpu, *tmp = NULL;
    486 	int error;
    487 
    488 	ASSERT_SLEEPABLE();
    489 
    490 	if (! threadpool_pri_is_valid(pri))
    491 		return EINVAL;
    492 
    493 	mutex_enter(&threadpools_lock);
    494 	pool_percpu = threadpool_lookup_percpu(pri);
    495 	if (pool_percpu == NULL) {
    496 		mutex_exit(&threadpools_lock);
    497 		TP_LOG(("%s: No pool for pri=%d, creating one.\n",
    498 		    __func__, (int)pri));
    499 		error = threadpool_percpu_create(&tmp, pri);
    500 		if (error)
    501 			return error;
    502 		KASSERT(tmp != NULL);
    503 		mutex_enter(&threadpools_lock);
    504 		pool_percpu = threadpool_lookup_percpu(pri);
    505 		if (pool_percpu == NULL) {
    506 			TP_LOG(("%s: Won the creation race for pri=%d.\n",
    507 			    __func__, (int)pri));
    508 			pool_percpu = tmp;
    509 			tmp = NULL;
    510 			threadpool_insert_percpu(pool_percpu);
    511 		}
    512 	}
    513 	KASSERT(pool_percpu != NULL);
    514 	pool_percpu->tpp_refcnt++;
    515 	KASSERT(pool_percpu->tpp_refcnt != 0);
    516 	mutex_exit(&threadpools_lock);
    517 
    518 	if (tmp != NULL)
    519 		threadpool_percpu_destroy(tmp);
    520 	KASSERT(pool_percpu != NULL);
    521 	*pool_percpup = pool_percpu;
    522 	return 0;
    523 }
    524 
    525 void
    526 threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri)
    527 {
    528 
    529 	ASSERT_SLEEPABLE();
    530 
    531 	KASSERT(threadpool_pri_is_valid(pri));
    532 
    533 	mutex_enter(&threadpools_lock);
    534 	KASSERT(pool_percpu == threadpool_lookup_percpu(pri));
    535 	KASSERT(0 < pool_percpu->tpp_refcnt);
    536 	if (--pool_percpu->tpp_refcnt == 0) {
    537 		TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
    538 		    __func__, (int)pri));
    539 		threadpool_remove_percpu(pool_percpu);
    540 	} else {
    541 		pool_percpu = NULL;
    542 	}
    543 	mutex_exit(&threadpools_lock);
    544 
    545 	if (pool_percpu)
    546 		threadpool_percpu_destroy(pool_percpu);
    547 }
    548 
    549 struct threadpool *
    550 threadpool_percpu_ref(struct threadpool_percpu *pool_percpu)
    551 {
    552 	struct threadpool **poolp, *pool;
    553 
    554 	poolp = percpu_getref(pool_percpu->tpp_percpu);
    555 	pool = *poolp;
    556 	percpu_putref(pool_percpu->tpp_percpu);
    557 
    558 	return pool;
    559 }
    560 
    561 struct threadpool *
    562 threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu,
    563     struct cpu_info *ci)
    564 {
    565 	struct threadpool **poolp, *pool;
    566 
    567 	percpu_traverse_enter();
    568 	poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
    569 	pool = *poolp;
    570 	percpu_traverse_exit();
    571 
    572 	return pool;
    573 }
    574 
    575 static int
    576 threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri)
    577 {
    578 	struct threadpool_percpu *pool_percpu;
    579 	struct cpu_info *ci;
    580 	CPU_INFO_ITERATOR cii;
    581 	unsigned int i, j;
    582 	int error;
    583 
    584 	pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP);
    585 	if (pool_percpu == NULL) {
    586 		error = ENOMEM;
    587 		goto fail0;
    588 	}
    589 	pool_percpu->tpp_pri = pri;
    590 
    591 	pool_percpu->tpp_percpu = percpu_alloc(sizeof(struct threadpool *));
    592 	if (pool_percpu->tpp_percpu == NULL) {
    593 		error = ENOMEM;
    594 		goto fail1;
    595 	}
    596 
    597 	for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) {
    598 		struct threadpool *pool;
    599 
    600 		pool = kmem_zalloc(sizeof(*pool), KM_SLEEP);
    601 		error = threadpool_create(pool, ci, pri);
    602 		if (error) {
    603 			kmem_free(pool, sizeof(*pool));
    604 			goto fail2;
    605 		}
    606 		percpu_traverse_enter();
    607 		struct threadpool **const poolp =
    608 		    percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
    609 		*poolp = pool;
    610 		percpu_traverse_exit();
    611 	}
    612 
    613 	/* Success!  */
    614 	*pool_percpup = (struct threadpool_percpu *)pool_percpu;
    615 	return 0;
    616 
    617 fail2:	for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) {
    618 		if (i <= j)
    619 			break;
    620 		percpu_traverse_enter();
    621 		struct threadpool **const poolp =
    622 		    percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
    623 		struct threadpool *const pool = *poolp;
    624 		percpu_traverse_exit();
    625 		threadpool_destroy(pool);
    626 		kmem_free(pool, sizeof(*pool));
    627 	}
    628 	percpu_free(pool_percpu->tpp_percpu, sizeof(struct taskthread_pool *));
    629 fail1:	kmem_free(pool_percpu, sizeof(*pool_percpu));
    630 fail0:	return error;
    631 }
    632 
    633 static void
    634 threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu)
    635 {
    636 	struct cpu_info *ci;
    637 	CPU_INFO_ITERATOR cii;
    638 
    639 	for (CPU_INFO_FOREACH(cii, ci)) {
    640 		percpu_traverse_enter();
    641 		struct threadpool **const poolp =
    642 		    percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
    643 		struct threadpool *const pool = *poolp;
    644 		percpu_traverse_exit();
    645 		threadpool_destroy(pool);
    646 		kmem_free(pool, sizeof(*pool));
    647 	}
    648 
    649 	percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
    650 	kmem_free(pool_percpu, sizeof(*pool_percpu));
    651 }
    652 
    653 /* Thread pool jobs */
    654 
    655 void __printflike(4,5)
    656 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn,
    657     kmutex_t *lock, const char *fmt, ...)
    658 {
    659 	va_list ap;
    660 
    661 	va_start(ap, fmt);
    662 	(void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap);
    663 	va_end(ap);
    664 
    665 	job->job_lock = lock;
    666 	job->job_thread = NULL;
    667 	job->job_refcnt = 0;
    668 	cv_init(&job->job_cv, job->job_name);
    669 	job->job_fn = fn;
    670 }
    671 
    672 static void
    673 threadpool_job_dead(struct threadpool_job *job)
    674 {
    675 
    676 	panic("threadpool job %p ran after destruction", job);
    677 }
    678 
    679 void
    680 threadpool_job_destroy(struct threadpool_job *job)
    681 {
    682 
    683 	ASSERT_SLEEPABLE();
    684 
    685 	KASSERTMSG((job->job_thread == NULL), "job %p still running", job);
    686 
    687 	mutex_enter(job->job_lock);
    688 	while (0 < job->job_refcnt)
    689 		cv_wait(&job->job_cv, job->job_lock);
    690 	mutex_exit(job->job_lock);
    691 
    692 	job->job_lock = NULL;
    693 	KASSERT(job->job_thread == NULL);
    694 	KASSERT(job->job_refcnt == 0);
    695 	KASSERT(!cv_has_waiters(&job->job_cv));
    696 	cv_destroy(&job->job_cv);
    697 	job->job_fn = threadpool_job_dead;
    698 	(void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
    699 }
    700 
    701 static void
    702 threadpool_job_hold(struct threadpool_job *job)
    703 {
    704 	unsigned int refcnt;
    705 
    706 	do {
    707 		refcnt = job->job_refcnt;
    708 		KASSERT(refcnt != UINT_MAX);
    709 	} while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1))
    710 	    != refcnt);
    711 }
    712 
    713 static void
    714 threadpool_job_rele(struct threadpool_job *job)
    715 {
    716 	unsigned int refcnt;
    717 
    718 	KASSERT(mutex_owned(job->job_lock));
    719 
    720 	do {
    721 		refcnt = job->job_refcnt;
    722 		KASSERT(0 < refcnt);
    723 		if (refcnt == 1) {
    724 			refcnt = atomic_dec_uint_nv(&job->job_refcnt);
    725 			KASSERT(refcnt != UINT_MAX);
    726 			if (refcnt == 0)
    727 				cv_broadcast(&job->job_cv);
    728 			return;
    729 		}
    730 	} while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt - 1))
    731 	    != refcnt);
    732 }
    733 
    734 void
    735 threadpool_job_done(struct threadpool_job *job)
    736 {
    737 
    738 	KASSERT(mutex_owned(job->job_lock));
    739 	KASSERT(job->job_thread != NULL);
    740 	KASSERT(job->job_thread->tpt_lwp == curlwp);
    741 
    742 	/*
    743 	 * We can safely read this field; it's only modified right before
    744 	 * we call the job work function, and we are only preserving it
    745 	 * to use here; no one cares if it contains junk afterward.
    746 	 */
    747 	lwp_lock(curlwp);
    748 	curlwp->l_name = job->job_thread->tpt_lwp_savedname;
    749 	lwp_unlock(curlwp);
    750 
    751 	/*
    752 	 * Inline the work of threadpool_job_rele(); the job is already
    753 	 * locked, the most likely scenario (XXXJRT only scenario?) is
    754 	 * that we're dropping the last reference (the one taken in
    755 	 * threadpool_schedule_job()), and we always do the cv_broadcast()
    756 	 * anyway.
    757 	 */
    758 	KASSERT(0 < job->job_refcnt);
    759 	unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt);
    760 	KASSERT(refcnt != UINT_MAX);
    761 	cv_broadcast(&job->job_cv);
    762 	job->job_thread = NULL;
    763 }
    764 
    765 void
    766 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
    767 {
    768 
    769 	KASSERT(mutex_owned(job->job_lock));
    770 
    771 	/*
    772 	 * If the job's already running, let it keep running.  The job
    773 	 * is guaranteed by the interlock not to end early -- if it had
    774 	 * ended early, threadpool_job_done would have set job_thread
    775 	 * to NULL under the interlock.
    776 	 */
    777 	if (__predict_true(job->job_thread != NULL)) {
    778 		TP_LOG(("%s: job '%s' already runnining.\n",
    779 		    __func__, job->job_name));
    780 		return;
    781 	}
    782 
    783 	threadpool_job_hold(job);
    784 
    785 	/* Otherwise, try to assign a thread to the job.  */
    786 	mutex_spin_enter(&pool->tp_lock);
    787 	if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
    788 		/* Nobody's idle.  Give it to the overseer.  */
    789 		TP_LOG(("%s: giving job '%s' to overseer.\n",
    790 		    __func__, job->job_name));
    791 		job->job_thread = &pool->tp_overseer;
    792 		TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
    793 	} else {
    794 		/* Assign it to the first idle thread.  */
    795 		job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
    796 		TP_LOG(("%s: giving job '%s' to idle thread %p.\n",
    797 		    __func__, job->job_name, job->job_thread));
    798 		TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
    799 		    tpt_entry);
    800 		job->job_thread->tpt_job = job;
    801 	}
    802 
    803 	/* Notify whomever we gave it to, overseer or idle thread.  */
    804 	KASSERT(job->job_thread != NULL);
    805 	cv_broadcast(&job->job_thread->tpt_cv);
    806 	mutex_spin_exit(&pool->tp_lock);
    807 }
    808 
    809 bool
    810 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
    811 {
    812 
    813 	KASSERT(mutex_owned(job->job_lock));
    814 
    815 	/*
    816 	 * XXXJRT This fails (albeit safely) when all of the following
    817 	 * are true:
    818 	 *
    819 	 *	=> "pool" is something other than what the job was
    820 	 *	   scheduled on.  This can legitimately occur if,
    821 	 *	   for example, a job is percpu-scheduled on CPU0
    822 	 *	   and then CPU1 attempts to cancel it without taking
    823 	 *	   a remote pool reference.  (this might happen by
    824 	 *	   "luck of the draw").
    825 	 *
    826 	 *	=> "job" is not yet running, but is assigned to the
    827 	 *	   overseer.
    828 	 *
    829 	 * When this happens, this code makes the determination that
    830 	 * the job is already running.  The failure mode is that the
    831 	 * caller is told the job is running, and thus has to wait.
    832 	 * The overseer will eventually get to it and the job will
    833 	 * proceed as if it had been already running.
    834 	 */
    835 
    836 	if (job->job_thread == NULL) {
    837 		/* Nothing to do.  Guaranteed not running.  */
    838 		return true;
    839 	} else if (job->job_thread == &pool->tp_overseer) {
    840 		/* Take it off the list to guarantee it won't run.  */
    841 		job->job_thread = NULL;
    842 		mutex_spin_enter(&pool->tp_lock);
    843 		TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
    844 		mutex_spin_exit(&pool->tp_lock);
    845 		threadpool_job_rele(job);
    846 		return true;
    847 	} else {
    848 		/* Too late -- already running.  */
    849 		return false;
    850 	}
    851 }
    852 
    853 void
    854 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
    855 {
    856 
    857 	ASSERT_SLEEPABLE();
    858 
    859 	KASSERT(mutex_owned(job->job_lock));
    860 
    861 	if (threadpool_cancel_job_async(pool, job))
    862 		return;
    863 
    864 	/* Already running.  Wait for it to complete.  */
    865 	while (job->job_thread != NULL)
    866 		cv_wait(&job->job_cv, job->job_lock);
    867 }
    868 
    869 /* Thread pool overseer thread */
    870 
    871 static void __dead
    872 threadpool_overseer_thread(void *arg)
    873 {
    874 	struct threadpool_thread *const overseer = arg;
    875 	struct threadpool *const pool = overseer->tpt_pool;
    876 	struct lwp *lwp = NULL;
    877 	int ktflags;
    878 	int error;
    879 
    880 	KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
    881 
    882 	/* Wait until we're initialized.  */
    883 	mutex_spin_enter(&pool->tp_lock);
    884 	while (overseer->tpt_lwp == NULL)
    885 		cv_wait(&overseer->tpt_cv, &pool->tp_lock);
    886 
    887 	TP_LOG(("%s: starting.\n", __func__));
    888 
    889 	for (;;) {
    890 		/* Wait until there's a job.  */
    891 		while (TAILQ_EMPTY(&pool->tp_jobs)) {
    892 			if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
    893 				TP_LOG(("%s: THREADPOOL_DYING\n",
    894 				    __func__));
    895 				break;
    896 			}
    897 			cv_wait(&overseer->tpt_cv, &pool->tp_lock);
    898 		}
    899 		if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
    900 			break;
    901 
    902 		/* If there are no threads, we'll have to try to start one.  */
    903 		if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
    904 			TP_LOG(("%s: Got a job, need to create a thread.\n",
    905 			    __func__));
    906 			threadpool_hold(pool);
    907 			mutex_spin_exit(&pool->tp_lock);
    908 
    909 			struct threadpool_thread *const thread =
    910 			    pool_cache_get(threadpool_thread_pc, PR_WAITOK);
    911 			thread->tpt_lwp = NULL;
    912 			thread->tpt_pool = pool;
    913 			thread->tpt_job = NULL;
    914 			cv_init(&thread->tpt_cv, "poolthrd");
    915 
    916 			ktflags = 0;
    917 			ktflags |= KTHREAD_MPSAFE;
    918 			if (pool->tp_pri < PRI_KERNEL)
    919 				ktflags |= KTHREAD_TS;
    920 			error = kthread_create(pool->tp_pri, ktflags,
    921 			    pool->tp_cpu, &threadpool_thread, thread, &lwp,
    922 			    "poolthread/%d@%d",
    923 			    (pool->tp_cpu ? cpu_index(pool->tp_cpu) : -1),
    924 			    (int)pool->tp_pri);
    925 
    926 			mutex_spin_enter(&pool->tp_lock);
    927 			if (error) {
    928 				pool_cache_put(threadpool_thread_pc, thread);
    929 				threadpool_rele(pool);
    930 				/* XXX What to do to wait for memory?  */
    931 				(void)kpause("thrdplcr", false, hz,
    932 				    &pool->tp_lock);
    933 				continue;
    934 			}
    935 			/*
    936 			 * New kthread now owns the reference to the pool
    937 			 * taken above.
    938 			 */
    939 			KASSERT(lwp != NULL);
    940 			TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread,
    941 			    tpt_entry);
    942 			thread->tpt_lwp = lwp;
    943 			lwp = NULL;
    944 			cv_broadcast(&thread->tpt_cv);
    945 			continue;
    946 		}
    947 
    948 		/* There are idle threads, so try giving one a job.  */
    949 		struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs);
    950 		TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
    951 		/*
    952 		 * Take an extra reference on the job temporarily so that
    953 		 * it won't disappear on us while we have both locks dropped.
    954 		 */
    955 		threadpool_job_hold(job);
    956 		mutex_spin_exit(&pool->tp_lock);
    957 
    958 		mutex_enter(job->job_lock);
    959 		/* If the job was cancelled, we'll no longer be its thread.  */
    960 		if (__predict_true(job->job_thread == overseer)) {
    961 			mutex_spin_enter(&pool->tp_lock);
    962 			if (__predict_false(
    963 				    TAILQ_EMPTY(&pool->tp_idle_threads))) {
    964 				/*
    965 				 * Someone else snagged the thread
    966 				 * first.  We'll have to try again.
    967 				 */
    968 				TP_LOG(("%s: '%s' lost race to use idle thread.\n",
    969 				    __func__, job->job_name));
    970 				TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
    971 				    job_entry);
    972 			} else {
    973 				/*
    974 				 * Assign the job to the thread and
    975 				 * wake the thread so it starts work.
    976 				 */
    977 				struct threadpool_thread *const thread =
    978 				    TAILQ_FIRST(&pool->tp_idle_threads);
    979 
    980 				TP_LOG(("%s: '%s' gets thread %p\n",
    981 				    __func__, job->job_name, thread));
    982 				KASSERT(thread->tpt_job == NULL);
    983 				TAILQ_REMOVE(&pool->tp_idle_threads, thread,
    984 				    tpt_entry);
    985 				thread->tpt_job = job;
    986 				job->job_thread = thread;
    987 				cv_broadcast(&thread->tpt_cv);
    988 			}
    989 			mutex_spin_exit(&pool->tp_lock);
    990 		}
    991 		threadpool_job_rele(job);
    992 		mutex_exit(job->job_lock);
    993 
    994 		mutex_spin_enter(&pool->tp_lock);
    995 	}
    996 	threadpool_rele(pool);
    997 	mutex_spin_exit(&pool->tp_lock);
    998 
    999 	TP_LOG(("%s: exiting.\n", __func__));
   1000 
   1001 	kthread_exit(0);
   1002 }
   1003 
   1004 /* Thread pool thread */
   1005 
   1006 static void __dead
   1007 threadpool_thread(void *arg)
   1008 {
   1009 	struct threadpool_thread *const thread = arg;
   1010 	struct threadpool *const pool = thread->tpt_pool;
   1011 
   1012 	KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
   1013 
   1014 	/* Wait until we're initialized and on the queue.  */
   1015 	mutex_spin_enter(&pool->tp_lock);
   1016 	while (thread->tpt_lwp == NULL)
   1017 		cv_wait(&thread->tpt_cv, &pool->tp_lock);
   1018 
   1019 	TP_LOG(("%s: starting.\n", __func__));
   1020 
   1021 	KASSERT(thread->tpt_lwp == curlwp);
   1022 	for (;;) {
   1023 		/* Wait until we are assigned a job.  */
   1024 		while (thread->tpt_job == NULL) {
   1025 			if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
   1026 				TP_LOG(("%s: THREADPOOL_DYING\n",
   1027 				    __func__));
   1028 				break;
   1029 			}
   1030 			if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
   1031 				mstohz(threadpool_idle_time_ms)))
   1032 				break;
   1033 		}
   1034 		if (__predict_false(thread->tpt_job == NULL)) {
   1035 			TAILQ_REMOVE(&pool->tp_idle_threads, thread,
   1036 			    tpt_entry);
   1037 			break;
   1038 		}
   1039 
   1040 		struct threadpool_job *const job = thread->tpt_job;
   1041 		KASSERT(job != NULL);
   1042 
   1043 		/* Set our lwp name to reflect what job we're doing.  */
   1044 		lwp_lock(curlwp);
   1045 		char *const lwp_name __diagused = curlwp->l_name;
   1046 		thread->tpt_lwp_savedname = curlwp->l_name;
   1047 		curlwp->l_name = job->job_name;
   1048 		lwp_unlock(curlwp);
   1049 
   1050 		mutex_spin_exit(&pool->tp_lock);
   1051 
   1052 		TP_LOG(("%s: running job '%s' on thread %p.\n",
   1053 		    __func__, job->job_name, thread));
   1054 
   1055 		/* Run the job.  */
   1056 		(*job->job_fn)(job);
   1057 
   1058 		/* lwp name restored in threadpool_job_done(). */
   1059 		KASSERTMSG((curlwp->l_name == lwp_name),
   1060 		    "someone forgot to call threadpool_job_done()!");
   1061 
   1062 		/*
   1063 		 * We can compare pointers, but we can no longer deference
   1064 		 * job after this because threadpool_job_done() drops the
   1065 		 * last reference on the job while the job is locked.
   1066 		 */
   1067 
   1068 		mutex_spin_enter(&pool->tp_lock);
   1069 		KASSERT(thread->tpt_job == job);
   1070 		thread->tpt_job = NULL;
   1071 		TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry);
   1072 	}
   1073 	threadpool_rele(pool);
   1074 	mutex_spin_exit(&pool->tp_lock);
   1075 
   1076 	TP_LOG(("%s: thread %p exiting.\n", __func__, thread));
   1077 
   1078 	KASSERT(!cv_has_waiters(&thread->tpt_cv));
   1079 	cv_destroy(&thread->tpt_cv);
   1080 	pool_cache_put(threadpool_thread_pc, thread);
   1081 	kthread_exit(0);
   1082 }
   1083