Home | History | Annotate | Line # | Download | only in kern
kern_threadpool.c revision 1.16
      1 /*	$NetBSD: kern_threadpool.c,v 1.16 2020/02/09 22:57:26 riastradh Exp $	*/
      2 
      3 /*-
      4  * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
      5  * All rights reserved.
      6  *
      7  * This code is derived from software contributed to The NetBSD Foundation
      8  * by Taylor R. Campbell and Jason R. Thorpe.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
     20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
     23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29  * POSSIBILITY OF SUCH DAMAGE.
     30  */
     31 
     32 /*
     33  * Thread pools.
     34  *
     35  * A thread pool is a collection of worker threads idle or running
     36  * jobs, together with an overseer thread that does not run jobs but
     37  * can be given jobs to assign to a worker thread.  Scheduling a job in
     38  * a thread pool does not allocate or even sleep at all, except perhaps
     39  * on an adaptive lock, unlike kthread_create.  Jobs reuse threads, so
     40  * they do not incur the expense of creating and destroying kthreads
     41  * unless there is not much work to be done.
     42  *
     43  * A per-CPU thread pool (threadpool_percpu) is a collection of thread
     44  * pools, one per CPU bound to that CPU.  For each priority level in
     45  * use, there is one shared unbound thread pool (i.e., pool of threads
     46  * not bound to any CPU) and one shared per-CPU thread pool.
     47  *
     48  * To use the unbound thread pool at priority pri, call
     49  * threadpool_get(&pool, pri).  When you're done, call
     50  * threadpool_put(pool, pri).
     51  *
     52  * To use the per-CPU thread pools at priority pri, call
     53  * threadpool_percpu_get(&pool_percpu, pri), and then use the thread
     54  * pool returned by threadpool_percpu_ref(pool_percpu) for the current
     55  * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another
     56  * CPU.  When you're done, call threadpool_percpu_put(pool_percpu,
     57  * pri).
     58  *
     59  * +--MACHINE-----------------------------------------------+
     60  * | +--CPU 0-------+ +--CPU 1-------+     +--CPU n-------+ |
     61  * | | <overseer 0> | | <overseer 1> | ... | <overseer n> | |
     62  * | | <idle 0a>    | | <running 1a> | ... | <idle na>    | |
     63  * | | <running 0b> | | <running 1b> | ... | <idle nb>    | |
     64  * | | .            | | .            | ... | .            | |
     65  * | | .            | | .            | ... | .            | |
     66  * | | .            | | .            | ... | .            | |
     67  * | +--------------+ +--------------+     +--------------+ |
     68  * |            +--unbound---------+                        |
     69  * |            | <overseer n+1>   |                        |
     70  * |            | <idle (n+1)a>    |                        |
     71  * |            | <running (n+1)b> |                        |
     72  * |            +------------------+                        |
     73  * +--------------------------------------------------------+
     74  *
     75  * XXX Why one overseer per CPU?  I did that originally to avoid
     76  * touching remote CPUs' memory when scheduling a job, but that still
     77  * requires interprocessor synchronization.  Perhaps we could get by
     78  * with a single overseer thread, at the expense of another pointer in
     79  * struct threadpool_job to identify the CPU on which it must run
     80  * in order for the overseer to schedule it correctly.
     81  */
     82 
     83 #include <sys/cdefs.h>
     84 __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.16 2020/02/09 22:57:26 riastradh Exp $");
     85 
     86 #include <sys/types.h>
     87 #include <sys/param.h>
     88 #include <sys/atomic.h>
     89 #include <sys/condvar.h>
     90 #include <sys/cpu.h>
     91 #include <sys/kernel.h>
     92 #include <sys/kmem.h>
     93 #include <sys/kthread.h>
     94 #include <sys/mutex.h>
     95 #include <sys/once.h>
     96 #include <sys/percpu.h>
     97 #include <sys/pool.h>
     98 #include <sys/proc.h>
     99 #include <sys/queue.h>
    100 #include <sys/systm.h>
    101 #include <sys/sysctl.h>
    102 #include <sys/threadpool.h>
    103 
    104 /* Data structures */
    105 
    106 TAILQ_HEAD(job_head, threadpool_job);
    107 TAILQ_HEAD(thread_head, threadpool_thread);
    108 
    109 struct threadpool_thread {
    110 	struct lwp			*tpt_lwp;
    111 	char				*tpt_lwp_savedname;
    112 	struct threadpool		*tpt_pool;
    113 	struct threadpool_job		*tpt_job;
    114 	kcondvar_t			tpt_cv;
    115 	TAILQ_ENTRY(threadpool_thread)	tpt_entry;
    116 };
    117 
    118 struct threadpool {
    119 	kmutex_t			tp_lock;
    120 	struct threadpool_thread	tp_overseer;
    121 	struct job_head			tp_jobs;
    122 	struct thread_head		tp_idle_threads;
    123 	uint64_t			tp_refcnt;
    124 	int				tp_flags;
    125 #define	THREADPOOL_DYING	0x01
    126 	struct cpu_info			*tp_cpu;
    127 	pri_t				tp_pri;
    128 };
    129 
    130 static void	threadpool_hold(struct threadpool *);
    131 static void	threadpool_rele(struct threadpool *);
    132 
    133 static int	threadpool_percpu_create(struct threadpool_percpu **, pri_t);
    134 static void	threadpool_percpu_destroy(struct threadpool_percpu *);
    135 static void	threadpool_percpu_init(void *, void *, struct cpu_info *);
    136 static void	threadpool_percpu_ok(void *, void *, struct cpu_info *);
    137 static void	threadpool_percpu_fini(void *, void *, struct cpu_info *);
    138 
    139 static threadpool_job_fn_t threadpool_job_dead;
    140 
    141 static void	threadpool_job_hold(struct threadpool_job *);
    142 static void	threadpool_job_rele(struct threadpool_job *);
    143 
    144 static void	threadpool_overseer_thread(void *) __dead;
    145 static void	threadpool_thread(void *) __dead;
    146 
    147 static pool_cache_t	threadpool_thread_pc __read_mostly;
    148 
    149 static kmutex_t		threadpools_lock __cacheline_aligned;
    150 
    151 	/* Default to 30 second idle timeout for pool threads. */
    152 static int	threadpool_idle_time_ms = 30 * 1000;
    153 
    154 struct threadpool_unbound {
    155 	struct threadpool		tpu_pool;
    156 
    157 	/* protected by threadpools_lock */
    158 	LIST_ENTRY(threadpool_unbound)	tpu_link;
    159 	uint64_t			tpu_refcnt;
    160 };
    161 
    162 static LIST_HEAD(, threadpool_unbound) unbound_threadpools;
    163 
    164 static struct threadpool_unbound *
    165 threadpool_lookup_unbound(pri_t pri)
    166 {
    167 	struct threadpool_unbound *tpu;
    168 
    169 	LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) {
    170 		if (tpu->tpu_pool.tp_pri == pri)
    171 			return tpu;
    172 	}
    173 	return NULL;
    174 }
    175 
    176 static void
    177 threadpool_insert_unbound(struct threadpool_unbound *tpu)
    178 {
    179 	KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL);
    180 	LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link);
    181 }
    182 
    183 static void
    184 threadpool_remove_unbound(struct threadpool_unbound *tpu)
    185 {
    186 	KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu);
    187 	LIST_REMOVE(tpu, tpu_link);
    188 }
    189 
    190 struct threadpool_percpu {
    191 	percpu_t *			tpp_percpu;
    192 	pri_t				tpp_pri;
    193 
    194 	/* protected by threadpools_lock */
    195 	LIST_ENTRY(threadpool_percpu)	tpp_link;
    196 	uint64_t			tpp_refcnt;
    197 };
    198 
    199 static LIST_HEAD(, threadpool_percpu) percpu_threadpools;
    200 
    201 static struct threadpool_percpu *
    202 threadpool_lookup_percpu(pri_t pri)
    203 {
    204 	struct threadpool_percpu *tpp;
    205 
    206 	LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) {
    207 		if (tpp->tpp_pri == pri)
    208 			return tpp;
    209 	}
    210 	return NULL;
    211 }
    212 
    213 static void
    214 threadpool_insert_percpu(struct threadpool_percpu *tpp)
    215 {
    216 	KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL);
    217 	LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link);
    218 }
    219 
    220 static void
    221 threadpool_remove_percpu(struct threadpool_percpu *tpp)
    222 {
    223 	KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp);
    224 	LIST_REMOVE(tpp, tpp_link);
    225 }
    226 
    227 #ifdef THREADPOOL_VERBOSE
    228 #define	TP_LOG(x)		printf x
    229 #else
    230 #define	TP_LOG(x)		/* nothing */
    231 #endif /* THREADPOOL_VERBOSE */
    232 
    233 static int
    234 sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS)
    235 {
    236 	struct sysctlnode node;
    237 	int val, error;
    238 
    239 	node = *rnode;
    240 
    241 	val = threadpool_idle_time_ms;
    242 	node.sysctl_data = &val;
    243 	error = sysctl_lookup(SYSCTLFN_CALL(&node));
    244 	if (error == 0 && newp != NULL) {
    245 		/* Disallow negative values and 0 (forever). */
    246 		if (val < 1)
    247 			error = EINVAL;
    248 		else
    249 			threadpool_idle_time_ms = val;
    250 	}
    251 
    252 	return error;
    253 }
    254 
    255 SYSCTL_SETUP_PROTO(sysctl_threadpool_setup);
    256 
    257 SYSCTL_SETUP(sysctl_threadpool_setup,
    258     "sysctl kern.threadpool subtree setup")
    259 {
    260 	const struct sysctlnode *rnode, *cnode;
    261 	int error __diagused;
    262 
    263 	error = sysctl_createv(clog, 0, NULL, &rnode,
    264 	    CTLFLAG_PERMANENT,
    265 	    CTLTYPE_NODE, "threadpool",
    266 	    SYSCTL_DESCR("threadpool subsystem options"),
    267 	    NULL, 0, NULL, 0,
    268 	    CTL_KERN, CTL_CREATE, CTL_EOL);
    269 	KASSERT(error == 0);
    270 
    271 	error = sysctl_createv(clog, 0, &rnode, &cnode,
    272 	    CTLFLAG_PERMANENT | CTLFLAG_READWRITE,
    273 	    CTLTYPE_INT, "idle_ms",
    274 	    SYSCTL_DESCR("idle thread timeout in ms"),
    275 	    sysctl_kern_threadpool_idle_ms, 0, NULL, 0,
    276 	    CTL_CREATE, CTL_EOL);
    277 	KASSERT(error == 0);
    278 }
    279 
    280 void
    281 threadpools_init(void)
    282 {
    283 
    284 	threadpool_thread_pc =
    285 	    pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0,
    286 		"thplthrd", NULL, IPL_NONE, NULL, NULL, NULL);
    287 
    288 	LIST_INIT(&unbound_threadpools);
    289 	LIST_INIT(&percpu_threadpools);
    290 	mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE);
    291 }
    292 
    293 /* Thread pool creation */
    294 
    295 static bool
    296 threadpool_pri_is_valid(pri_t pri)
    297 {
    298 	return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT));
    299 }
    300 
    301 static int
    302 threadpool_create(struct threadpool *const pool, struct cpu_info *ci,
    303     pri_t pri)
    304 {
    305 	struct lwp *lwp;
    306 	int ktflags;
    307 	int error;
    308 
    309 	KASSERT(threadpool_pri_is_valid(pri));
    310 
    311 	mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM);
    312 	/* XXX overseer */
    313 	TAILQ_INIT(&pool->tp_jobs);
    314 	TAILQ_INIT(&pool->tp_idle_threads);
    315 	pool->tp_refcnt = 1;		/* overseer's reference */
    316 	pool->tp_flags = 0;
    317 	pool->tp_cpu = ci;
    318 	pool->tp_pri = pri;
    319 
    320 	pool->tp_overseer.tpt_lwp = NULL;
    321 	pool->tp_overseer.tpt_pool = pool;
    322 	pool->tp_overseer.tpt_job = NULL;
    323 	cv_init(&pool->tp_overseer.tpt_cv, "poolover");
    324 
    325 	ktflags = 0;
    326 	ktflags |= KTHREAD_MPSAFE;
    327 	if (pri < PRI_KERNEL)
    328 		ktflags |= KTHREAD_TS;
    329 	error = kthread_create(pri, ktflags, ci, &threadpool_overseer_thread,
    330 	    &pool->tp_overseer, &lwp,
    331 	    "pooloverseer/%d@%d", (ci ? cpu_index(ci) : -1), (int)pri);
    332 	if (error)
    333 		goto fail0;
    334 
    335 	mutex_spin_enter(&pool->tp_lock);
    336 	pool->tp_overseer.tpt_lwp = lwp;
    337 	cv_broadcast(&pool->tp_overseer.tpt_cv);
    338 	mutex_spin_exit(&pool->tp_lock);
    339 
    340 	return 0;
    341 
    342 fail0:	KASSERT(error);
    343 	KASSERT(pool->tp_overseer.tpt_job == NULL);
    344 	KASSERT(pool->tp_overseer.tpt_pool == pool);
    345 	KASSERT(pool->tp_flags == 0);
    346 	KASSERT(pool->tp_refcnt == 0);
    347 	KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
    348 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
    349 	KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
    350 	cv_destroy(&pool->tp_overseer.tpt_cv);
    351 	mutex_destroy(&pool->tp_lock);
    352 	return error;
    353 }
    354 
    355 /* Thread pool destruction */
    356 
    357 static void
    358 threadpool_destroy(struct threadpool *pool)
    359 {
    360 	struct threadpool_thread *thread;
    361 
    362 	/* Mark the pool dying and wait for threads to commit suicide.  */
    363 	mutex_spin_enter(&pool->tp_lock);
    364 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
    365 	pool->tp_flags |= THREADPOOL_DYING;
    366 	cv_broadcast(&pool->tp_overseer.tpt_cv);
    367 	TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry)
    368 		cv_broadcast(&thread->tpt_cv);
    369 	while (0 < pool->tp_refcnt) {
    370 		TP_LOG(("%s: draining %" PRIu64 " references...\n", __func__,
    371 		    pool->tp_refcnt));
    372 		cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock);
    373 	}
    374 	mutex_spin_exit(&pool->tp_lock);
    375 
    376 	KASSERT(pool->tp_overseer.tpt_job == NULL);
    377 	KASSERT(pool->tp_overseer.tpt_pool == pool);
    378 	KASSERT(pool->tp_flags == THREADPOOL_DYING);
    379 	KASSERT(pool->tp_refcnt == 0);
    380 	KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
    381 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
    382 	KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
    383 	cv_destroy(&pool->tp_overseer.tpt_cv);
    384 	mutex_destroy(&pool->tp_lock);
    385 }
    386 
    387 static void
    388 threadpool_hold(struct threadpool *pool)
    389 {
    390 
    391 	KASSERT(mutex_owned(&pool->tp_lock));
    392 	pool->tp_refcnt++;
    393 	KASSERT(pool->tp_refcnt != 0);
    394 }
    395 
    396 static void
    397 threadpool_rele(struct threadpool *pool)
    398 {
    399 
    400 	KASSERT(mutex_owned(&pool->tp_lock));
    401 	KASSERT(0 < pool->tp_refcnt);
    402 	if (--pool->tp_refcnt == 0)
    403 		cv_broadcast(&pool->tp_overseer.tpt_cv);
    404 }
    405 
    406 /* Unbound thread pools */
    407 
    408 int
    409 threadpool_get(struct threadpool **poolp, pri_t pri)
    410 {
    411 	struct threadpool_unbound *tpu, *tmp = NULL;
    412 	int error;
    413 
    414 	ASSERT_SLEEPABLE();
    415 
    416 	if (! threadpool_pri_is_valid(pri))
    417 		return EINVAL;
    418 
    419 	mutex_enter(&threadpools_lock);
    420 	tpu = threadpool_lookup_unbound(pri);
    421 	if (tpu == NULL) {
    422 		mutex_exit(&threadpools_lock);
    423 		TP_LOG(("%s: No pool for pri=%d, creating one.\n",
    424 		    __func__, (int)pri));
    425 		tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP);
    426 		error = threadpool_create(&tmp->tpu_pool, NULL, pri);
    427 		if (error) {
    428 			kmem_free(tmp, sizeof(*tmp));
    429 			return error;
    430 		}
    431 		mutex_enter(&threadpools_lock);
    432 		tpu = threadpool_lookup_unbound(pri);
    433 		if (tpu == NULL) {
    434 			TP_LOG(("%s: Won the creation race for pri=%d.\n",
    435 			    __func__, (int)pri));
    436 			tpu = tmp;
    437 			tmp = NULL;
    438 			threadpool_insert_unbound(tpu);
    439 		}
    440 	}
    441 	KASSERT(tpu != NULL);
    442 	tpu->tpu_refcnt++;
    443 	KASSERT(tpu->tpu_refcnt != 0);
    444 	mutex_exit(&threadpools_lock);
    445 
    446 	if (tmp != NULL) {
    447 		threadpool_destroy(&tmp->tpu_pool);
    448 		kmem_free(tmp, sizeof(*tmp));
    449 	}
    450 	KASSERT(tpu != NULL);
    451 	*poolp = &tpu->tpu_pool;
    452 	return 0;
    453 }
    454 
    455 void
    456 threadpool_put(struct threadpool *pool, pri_t pri)
    457 {
    458 	struct threadpool_unbound *tpu =
    459 	    container_of(pool, struct threadpool_unbound, tpu_pool);
    460 
    461 	ASSERT_SLEEPABLE();
    462 
    463 	KASSERT(threadpool_pri_is_valid(pri));
    464 
    465 	mutex_enter(&threadpools_lock);
    466 	KASSERT(tpu == threadpool_lookup_unbound(pri));
    467 	KASSERT(0 < tpu->tpu_refcnt);
    468 	if (--tpu->tpu_refcnt == 0) {
    469 		TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
    470 		    __func__, (int)pri));
    471 		threadpool_remove_unbound(tpu);
    472 	} else {
    473 		tpu = NULL;
    474 	}
    475 	mutex_exit(&threadpools_lock);
    476 
    477 	if (tpu) {
    478 		threadpool_destroy(&tpu->tpu_pool);
    479 		kmem_free(tpu, sizeof(*tpu));
    480 	}
    481 }
    482 
    483 /* Per-CPU thread pools */
    484 
    485 int
    486 threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri)
    487 {
    488 	struct threadpool_percpu *pool_percpu, *tmp = NULL;
    489 	int error;
    490 
    491 	ASSERT_SLEEPABLE();
    492 
    493 	if (! threadpool_pri_is_valid(pri))
    494 		return EINVAL;
    495 
    496 	mutex_enter(&threadpools_lock);
    497 	pool_percpu = threadpool_lookup_percpu(pri);
    498 	if (pool_percpu == NULL) {
    499 		mutex_exit(&threadpools_lock);
    500 		TP_LOG(("%s: No pool for pri=%d, creating one.\n",
    501 		    __func__, (int)pri));
    502 		error = threadpool_percpu_create(&tmp, pri);
    503 		if (error)
    504 			return error;
    505 		KASSERT(tmp != NULL);
    506 		mutex_enter(&threadpools_lock);
    507 		pool_percpu = threadpool_lookup_percpu(pri);
    508 		if (pool_percpu == NULL) {
    509 			TP_LOG(("%s: Won the creation race for pri=%d.\n",
    510 			    __func__, (int)pri));
    511 			pool_percpu = tmp;
    512 			tmp = NULL;
    513 			threadpool_insert_percpu(pool_percpu);
    514 		}
    515 	}
    516 	KASSERT(pool_percpu != NULL);
    517 	pool_percpu->tpp_refcnt++;
    518 	KASSERT(pool_percpu->tpp_refcnt != 0);
    519 	mutex_exit(&threadpools_lock);
    520 
    521 	if (tmp != NULL)
    522 		threadpool_percpu_destroy(tmp);
    523 	KASSERT(pool_percpu != NULL);
    524 	*pool_percpup = pool_percpu;
    525 	return 0;
    526 }
    527 
    528 void
    529 threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri)
    530 {
    531 
    532 	ASSERT_SLEEPABLE();
    533 
    534 	KASSERT(threadpool_pri_is_valid(pri));
    535 
    536 	mutex_enter(&threadpools_lock);
    537 	KASSERT(pool_percpu == threadpool_lookup_percpu(pri));
    538 	KASSERT(0 < pool_percpu->tpp_refcnt);
    539 	if (--pool_percpu->tpp_refcnt == 0) {
    540 		TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
    541 		    __func__, (int)pri));
    542 		threadpool_remove_percpu(pool_percpu);
    543 	} else {
    544 		pool_percpu = NULL;
    545 	}
    546 	mutex_exit(&threadpools_lock);
    547 
    548 	if (pool_percpu)
    549 		threadpool_percpu_destroy(pool_percpu);
    550 }
    551 
    552 struct threadpool *
    553 threadpool_percpu_ref(struct threadpool_percpu *pool_percpu)
    554 {
    555 	struct threadpool **poolp, *pool;
    556 
    557 	poolp = percpu_getref(pool_percpu->tpp_percpu);
    558 	pool = *poolp;
    559 	percpu_putref(pool_percpu->tpp_percpu);
    560 
    561 	return pool;
    562 }
    563 
    564 struct threadpool *
    565 threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu,
    566     struct cpu_info *ci)
    567 {
    568 	struct threadpool **poolp, *pool;
    569 
    570 	percpu_traverse_enter();
    571 	poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
    572 	pool = *poolp;
    573 	percpu_traverse_exit();
    574 
    575 	return pool;
    576 }
    577 
    578 static int
    579 threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri)
    580 {
    581 	struct threadpool_percpu *pool_percpu;
    582 	bool ok = true;
    583 
    584 	pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP);
    585 	pool_percpu->tpp_pri = pri;
    586 	pool_percpu->tpp_percpu = percpu_create(sizeof(struct threadpool *),
    587 	    threadpool_percpu_init, threadpool_percpu_fini,
    588 	    (void *)(intptr_t)pri);
    589 
    590 	/*
    591 	 * Verify that all of the CPUs were initialized.
    592 	 *
    593 	 * XXX What to do if we add CPU hotplug?
    594 	 */
    595 	percpu_foreach(pool_percpu->tpp_percpu, &threadpool_percpu_ok, &ok);
    596 	if (!ok)
    597 		goto fail;
    598 
    599 	/* Success!  */
    600 	*pool_percpup = (struct threadpool_percpu *)pool_percpu;
    601 	return 0;
    602 
    603 fail:	percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
    604 	kmem_free(pool_percpu, sizeof(*pool_percpu));
    605 	return ENOMEM;
    606 }
    607 
    608 static void
    609 threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu)
    610 {
    611 
    612 	percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
    613 	kmem_free(pool_percpu, sizeof(*pool_percpu));
    614 }
    615 
    616 static void
    617 threadpool_percpu_init(void *vpoolp, void *vpri, struct cpu_info *ci)
    618 {
    619 	struct threadpool **const poolp = vpoolp;
    620 	pri_t pri = (intptr_t)(void *)vpri;
    621 	int error;
    622 
    623 	*poolp = kmem_zalloc(sizeof(**poolp), KM_SLEEP);
    624 	error = threadpool_create(*poolp, ci, pri);
    625 	if (error) {
    626 		KASSERT(error == ENOMEM);
    627 		kmem_free(*poolp, sizeof(**poolp));
    628 		*poolp = NULL;
    629 	}
    630 }
    631 
    632 static void
    633 threadpool_percpu_ok(void *vpoolp, void *vokp, struct cpu_info *ci)
    634 {
    635 	struct threadpool **const poolp = vpoolp;
    636 	bool *okp = vokp;
    637 
    638 	if (*poolp == NULL)
    639 		atomic_store_relaxed(okp, false);
    640 }
    641 
    642 static void
    643 threadpool_percpu_fini(void *vpoolp, void *vprip, struct cpu_info *ci)
    644 {
    645 	struct threadpool **const poolp = vpoolp;
    646 
    647 	if (*poolp == NULL)	/* initialization failed */
    648 		return;
    649 	threadpool_destroy(*poolp);
    650 	kmem_free(*poolp, sizeof(**poolp));
    651 }
    652 
    653 /* Thread pool jobs */
    654 
    655 void __printflike(4,5)
    656 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn,
    657     kmutex_t *lock, const char *fmt, ...)
    658 {
    659 	va_list ap;
    660 
    661 	va_start(ap, fmt);
    662 	(void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap);
    663 	va_end(ap);
    664 
    665 	job->job_lock = lock;
    666 	job->job_thread = NULL;
    667 	job->job_refcnt = 0;
    668 	cv_init(&job->job_cv, job->job_name);
    669 	job->job_fn = fn;
    670 }
    671 
    672 static void
    673 threadpool_job_dead(struct threadpool_job *job)
    674 {
    675 
    676 	panic("threadpool job %p ran after destruction", job);
    677 }
    678 
    679 void
    680 threadpool_job_destroy(struct threadpool_job *job)
    681 {
    682 
    683 	ASSERT_SLEEPABLE();
    684 
    685 	KASSERTMSG((job->job_thread == NULL), "job %p still running", job);
    686 
    687 	mutex_enter(job->job_lock);
    688 	while (0 < job->job_refcnt)
    689 		cv_wait(&job->job_cv, job->job_lock);
    690 	mutex_exit(job->job_lock);
    691 
    692 	job->job_lock = NULL;
    693 	KASSERT(job->job_thread == NULL);
    694 	KASSERT(job->job_refcnt == 0);
    695 	KASSERT(!cv_has_waiters(&job->job_cv));
    696 	cv_destroy(&job->job_cv);
    697 	job->job_fn = threadpool_job_dead;
    698 	(void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
    699 }
    700 
    701 static void
    702 threadpool_job_hold(struct threadpool_job *job)
    703 {
    704 	unsigned int refcnt;
    705 
    706 	do {
    707 		refcnt = job->job_refcnt;
    708 		KASSERT(refcnt != UINT_MAX);
    709 	} while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1))
    710 	    != refcnt);
    711 }
    712 
    713 static void
    714 threadpool_job_rele(struct threadpool_job *job)
    715 {
    716 	unsigned int refcnt;
    717 
    718 	KASSERT(mutex_owned(job->job_lock));
    719 
    720 	do {
    721 		refcnt = job->job_refcnt;
    722 		KASSERT(0 < refcnt);
    723 		if (refcnt == 1) {
    724 			refcnt = atomic_dec_uint_nv(&job->job_refcnt);
    725 			KASSERT(refcnt != UINT_MAX);
    726 			if (refcnt == 0)
    727 				cv_broadcast(&job->job_cv);
    728 			return;
    729 		}
    730 	} while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt - 1))
    731 	    != refcnt);
    732 }
    733 
    734 void
    735 threadpool_job_done(struct threadpool_job *job)
    736 {
    737 
    738 	KASSERT(mutex_owned(job->job_lock));
    739 	KASSERT(job->job_thread != NULL);
    740 	KASSERT(job->job_thread->tpt_lwp == curlwp);
    741 
    742 	/*
    743 	 * We can safely read this field; it's only modified right before
    744 	 * we call the job work function, and we are only preserving it
    745 	 * to use here; no one cares if it contains junk afterward.
    746 	 */
    747 	lwp_lock(curlwp);
    748 	curlwp->l_name = job->job_thread->tpt_lwp_savedname;
    749 	lwp_unlock(curlwp);
    750 
    751 	/*
    752 	 * Inline the work of threadpool_job_rele(); the job is already
    753 	 * locked, the most likely scenario (XXXJRT only scenario?) is
    754 	 * that we're dropping the last reference (the one taken in
    755 	 * threadpool_schedule_job()), and we always do the cv_broadcast()
    756 	 * anyway.
    757 	 */
    758 	KASSERT(0 < job->job_refcnt);
    759 	unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt);
    760 	KASSERT(refcnt != UINT_MAX);
    761 	cv_broadcast(&job->job_cv);
    762 	job->job_thread = NULL;
    763 }
    764 
    765 void
    766 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
    767 {
    768 
    769 	KASSERT(mutex_owned(job->job_lock));
    770 
    771 	/*
    772 	 * If the job's already running, let it keep running.  The job
    773 	 * is guaranteed by the interlock not to end early -- if it had
    774 	 * ended early, threadpool_job_done would have set job_thread
    775 	 * to NULL under the interlock.
    776 	 */
    777 	if (__predict_true(job->job_thread != NULL)) {
    778 		TP_LOG(("%s: job '%s' already runnining.\n",
    779 		    __func__, job->job_name));
    780 		return;
    781 	}
    782 
    783 	threadpool_job_hold(job);
    784 
    785 	/* Otherwise, try to assign a thread to the job.  */
    786 	mutex_spin_enter(&pool->tp_lock);
    787 	if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
    788 		/* Nobody's idle.  Give it to the overseer.  */
    789 		TP_LOG(("%s: giving job '%s' to overseer.\n",
    790 		    __func__, job->job_name));
    791 		job->job_thread = &pool->tp_overseer;
    792 		TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
    793 	} else {
    794 		/* Assign it to the first idle thread.  */
    795 		job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
    796 		TP_LOG(("%s: giving job '%s' to idle thread %p.\n",
    797 		    __func__, job->job_name, job->job_thread));
    798 		TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
    799 		    tpt_entry);
    800 		job->job_thread->tpt_job = job;
    801 	}
    802 
    803 	/* Notify whomever we gave it to, overseer or idle thread.  */
    804 	KASSERT(job->job_thread != NULL);
    805 	cv_broadcast(&job->job_thread->tpt_cv);
    806 	mutex_spin_exit(&pool->tp_lock);
    807 }
    808 
    809 bool
    810 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
    811 {
    812 
    813 	KASSERT(mutex_owned(job->job_lock));
    814 
    815 	/*
    816 	 * XXXJRT This fails (albeit safely) when all of the following
    817 	 * are true:
    818 	 *
    819 	 *	=> "pool" is something other than what the job was
    820 	 *	   scheduled on.  This can legitimately occur if,
    821 	 *	   for example, a job is percpu-scheduled on CPU0
    822 	 *	   and then CPU1 attempts to cancel it without taking
    823 	 *	   a remote pool reference.  (this might happen by
    824 	 *	   "luck of the draw").
    825 	 *
    826 	 *	=> "job" is not yet running, but is assigned to the
    827 	 *	   overseer.
    828 	 *
    829 	 * When this happens, this code makes the determination that
    830 	 * the job is already running.  The failure mode is that the
    831 	 * caller is told the job is running, and thus has to wait.
    832 	 * The overseer will eventually get to it and the job will
    833 	 * proceed as if it had been already running.
    834 	 */
    835 
    836 	if (job->job_thread == NULL) {
    837 		/* Nothing to do.  Guaranteed not running.  */
    838 		return true;
    839 	} else if (job->job_thread == &pool->tp_overseer) {
    840 		/* Take it off the list to guarantee it won't run.  */
    841 		job->job_thread = NULL;
    842 		mutex_spin_enter(&pool->tp_lock);
    843 		TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
    844 		mutex_spin_exit(&pool->tp_lock);
    845 		threadpool_job_rele(job);
    846 		return true;
    847 	} else {
    848 		/* Too late -- already running.  */
    849 		return false;
    850 	}
    851 }
    852 
    853 void
    854 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
    855 {
    856 
    857 	ASSERT_SLEEPABLE();
    858 
    859 	KASSERT(mutex_owned(job->job_lock));
    860 
    861 	if (threadpool_cancel_job_async(pool, job))
    862 		return;
    863 
    864 	/* Already running.  Wait for it to complete.  */
    865 	while (job->job_thread != NULL)
    866 		cv_wait(&job->job_cv, job->job_lock);
    867 }
    868 
    869 /* Thread pool overseer thread */
    870 
    871 static void __dead
    872 threadpool_overseer_thread(void *arg)
    873 {
    874 	struct threadpool_thread *const overseer = arg;
    875 	struct threadpool *const pool = overseer->tpt_pool;
    876 	struct lwp *lwp = NULL;
    877 	int ktflags;
    878 	int error;
    879 
    880 	KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
    881 
    882 	/* Wait until we're initialized.  */
    883 	mutex_spin_enter(&pool->tp_lock);
    884 	while (overseer->tpt_lwp == NULL)
    885 		cv_wait(&overseer->tpt_cv, &pool->tp_lock);
    886 
    887 	TP_LOG(("%s: starting.\n", __func__));
    888 
    889 	for (;;) {
    890 		/* Wait until there's a job.  */
    891 		while (TAILQ_EMPTY(&pool->tp_jobs)) {
    892 			if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
    893 				TP_LOG(("%s: THREADPOOL_DYING\n",
    894 				    __func__));
    895 				break;
    896 			}
    897 			cv_wait(&overseer->tpt_cv, &pool->tp_lock);
    898 		}
    899 		if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
    900 			break;
    901 
    902 		/* If there are no threads, we'll have to try to start one.  */
    903 		if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
    904 			TP_LOG(("%s: Got a job, need to create a thread.\n",
    905 			    __func__));
    906 			threadpool_hold(pool);
    907 			mutex_spin_exit(&pool->tp_lock);
    908 
    909 			struct threadpool_thread *const thread =
    910 			    pool_cache_get(threadpool_thread_pc, PR_WAITOK);
    911 			thread->tpt_lwp = NULL;
    912 			thread->tpt_pool = pool;
    913 			thread->tpt_job = NULL;
    914 			cv_init(&thread->tpt_cv, "poolthrd");
    915 
    916 			ktflags = 0;
    917 			ktflags |= KTHREAD_MPSAFE;
    918 			if (pool->tp_pri < PRI_KERNEL)
    919 				ktflags |= KTHREAD_TS;
    920 			error = kthread_create(pool->tp_pri, ktflags,
    921 			    pool->tp_cpu, &threadpool_thread, thread, &lwp,
    922 			    "poolthread/%d@%d",
    923 			    (pool->tp_cpu ? cpu_index(pool->tp_cpu) : -1),
    924 			    (int)pool->tp_pri);
    925 
    926 			mutex_spin_enter(&pool->tp_lock);
    927 			if (error) {
    928 				pool_cache_put(threadpool_thread_pc, thread);
    929 				threadpool_rele(pool);
    930 				/* XXX What to do to wait for memory?  */
    931 				(void)kpause("thrdplcr", false, hz,
    932 				    &pool->tp_lock);
    933 				continue;
    934 			}
    935 			/*
    936 			 * New kthread now owns the reference to the pool
    937 			 * taken above.
    938 			 */
    939 			KASSERT(lwp != NULL);
    940 			TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread,
    941 			    tpt_entry);
    942 			thread->tpt_lwp = lwp;
    943 			lwp = NULL;
    944 			cv_broadcast(&thread->tpt_cv);
    945 			continue;
    946 		}
    947 
    948 		/* There are idle threads, so try giving one a job.  */
    949 		struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs);
    950 		TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
    951 		/*
    952 		 * Take an extra reference on the job temporarily so that
    953 		 * it won't disappear on us while we have both locks dropped.
    954 		 */
    955 		threadpool_job_hold(job);
    956 		mutex_spin_exit(&pool->tp_lock);
    957 
    958 		mutex_enter(job->job_lock);
    959 		/* If the job was cancelled, we'll no longer be its thread.  */
    960 		if (__predict_true(job->job_thread == overseer)) {
    961 			mutex_spin_enter(&pool->tp_lock);
    962 			if (__predict_false(
    963 				    TAILQ_EMPTY(&pool->tp_idle_threads))) {
    964 				/*
    965 				 * Someone else snagged the thread
    966 				 * first.  We'll have to try again.
    967 				 */
    968 				TP_LOG(("%s: '%s' lost race to use idle thread.\n",
    969 				    __func__, job->job_name));
    970 				TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
    971 				    job_entry);
    972 			} else {
    973 				/*
    974 				 * Assign the job to the thread and
    975 				 * wake the thread so it starts work.
    976 				 */
    977 				struct threadpool_thread *const thread =
    978 				    TAILQ_FIRST(&pool->tp_idle_threads);
    979 
    980 				TP_LOG(("%s: '%s' gets thread %p\n",
    981 				    __func__, job->job_name, thread));
    982 				KASSERT(thread->tpt_job == NULL);
    983 				TAILQ_REMOVE(&pool->tp_idle_threads, thread,
    984 				    tpt_entry);
    985 				thread->tpt_job = job;
    986 				job->job_thread = thread;
    987 				cv_broadcast(&thread->tpt_cv);
    988 			}
    989 			mutex_spin_exit(&pool->tp_lock);
    990 		}
    991 		threadpool_job_rele(job);
    992 		mutex_exit(job->job_lock);
    993 
    994 		mutex_spin_enter(&pool->tp_lock);
    995 	}
    996 	threadpool_rele(pool);
    997 	mutex_spin_exit(&pool->tp_lock);
    998 
    999 	TP_LOG(("%s: exiting.\n", __func__));
   1000 
   1001 	kthread_exit(0);
   1002 }
   1003 
   1004 /* Thread pool thread */
   1005 
   1006 static void __dead
   1007 threadpool_thread(void *arg)
   1008 {
   1009 	struct threadpool_thread *const thread = arg;
   1010 	struct threadpool *const pool = thread->tpt_pool;
   1011 
   1012 	KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
   1013 
   1014 	/* Wait until we're initialized and on the queue.  */
   1015 	mutex_spin_enter(&pool->tp_lock);
   1016 	while (thread->tpt_lwp == NULL)
   1017 		cv_wait(&thread->tpt_cv, &pool->tp_lock);
   1018 
   1019 	TP_LOG(("%s: starting.\n", __func__));
   1020 
   1021 	KASSERT(thread->tpt_lwp == curlwp);
   1022 	for (;;) {
   1023 		/* Wait until we are assigned a job.  */
   1024 		while (thread->tpt_job == NULL) {
   1025 			if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
   1026 				TP_LOG(("%s: THREADPOOL_DYING\n",
   1027 				    __func__));
   1028 				break;
   1029 			}
   1030 			if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
   1031 				mstohz(threadpool_idle_time_ms)))
   1032 				break;
   1033 		}
   1034 		if (__predict_false(thread->tpt_job == NULL)) {
   1035 			TAILQ_REMOVE(&pool->tp_idle_threads, thread,
   1036 			    tpt_entry);
   1037 			break;
   1038 		}
   1039 
   1040 		struct threadpool_job *const job = thread->tpt_job;
   1041 		KASSERT(job != NULL);
   1042 
   1043 		/* Set our lwp name to reflect what job we're doing.  */
   1044 		lwp_lock(curlwp);
   1045 		char *const lwp_name __diagused = curlwp->l_name;
   1046 		thread->tpt_lwp_savedname = curlwp->l_name;
   1047 		curlwp->l_name = job->job_name;
   1048 		lwp_unlock(curlwp);
   1049 
   1050 		mutex_spin_exit(&pool->tp_lock);
   1051 
   1052 		TP_LOG(("%s: running job '%s' on thread %p.\n",
   1053 		    __func__, job->job_name, thread));
   1054 
   1055 		/* Run the job.  */
   1056 		(*job->job_fn)(job);
   1057 
   1058 		/* lwp name restored in threadpool_job_done(). */
   1059 		KASSERTMSG((curlwp->l_name == lwp_name),
   1060 		    "someone forgot to call threadpool_job_done()!");
   1061 
   1062 		/*
   1063 		 * We can compare pointers, but we can no longer deference
   1064 		 * job after this because threadpool_job_done() drops the
   1065 		 * last reference on the job while the job is locked.
   1066 		 */
   1067 
   1068 		mutex_spin_enter(&pool->tp_lock);
   1069 		KASSERT(thread->tpt_job == job);
   1070 		thread->tpt_job = NULL;
   1071 		TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry);
   1072 	}
   1073 	threadpool_rele(pool);
   1074 	mutex_spin_exit(&pool->tp_lock);
   1075 
   1076 	TP_LOG(("%s: thread %p exiting.\n", __func__, thread));
   1077 
   1078 	KASSERT(!cv_has_waiters(&thread->tpt_cv));
   1079 	cv_destroy(&thread->tpt_cv);
   1080 	pool_cache_put(threadpool_thread_pc, thread);
   1081 	kthread_exit(0);
   1082 }
   1083