Home | History | Annotate | Line # | Download | only in kern
kern_threadpool.c revision 1.21
      1 /*	$NetBSD: kern_threadpool.c,v 1.21 2021/01/13 02:20:15 riastradh Exp $	*/
      2 
      3 /*-
      4  * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
      5  * All rights reserved.
      6  *
      7  * This code is derived from software contributed to The NetBSD Foundation
      8  * by Taylor R. Campbell and Jason R. Thorpe.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
     20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
     23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29  * POSSIBILITY OF SUCH DAMAGE.
     30  */
     31 
     32 /*
     33  * Thread pools.
     34  *
     35  * A thread pool is a collection of worker threads idle or running
     36  * jobs, together with an dispatcher thread that does not run jobs but
     37  * can be given jobs to assign to a worker thread.  Scheduling a job in
     38  * a thread pool does not allocate or even sleep at all, except perhaps
     39  * on an adaptive lock, unlike kthread_create.  Jobs reuse threads, so
     40  * they do not incur the expense of creating and destroying kthreads
     41  * unless there is not much work to be done.
     42  *
     43  * A per-CPU thread pool (threadpool_percpu) is a collection of thread
     44  * pools, one per CPU bound to that CPU.  For each priority level in
     45  * use, there is one shared unbound thread pool (i.e., pool of threads
     46  * not bound to any CPU) and one shared per-CPU thread pool.
     47  *
     48  * To use the unbound thread pool at priority pri, call
     49  * threadpool_get(&pool, pri).  When you're done, call
     50  * threadpool_put(pool, pri).
     51  *
     52  * To use the per-CPU thread pools at priority pri, call
     53  * threadpool_percpu_get(&pool_percpu, pri), and then use the thread
     54  * pool returned by threadpool_percpu_ref(pool_percpu) for the current
     55  * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another
     56  * CPU.  When you're done, call threadpool_percpu_put(pool_percpu,
     57  * pri).
     58  *
     59  * +--MACHINE-----------------------------------------------------+
     60  * | +--CPU 0---------+ +--CPU 1---------+     +--CPU n---------+ |
     61  * | | <dispatcher 0> | | <dispatcher 1> | ... | <dispatcher n> | |
     62  * | | <idle 0a>      | | <running 1a>   | ... | <idle na>      | |
     63  * | | <running 0b>   | | <running 1b>   | ... | <idle nb>      | |
     64  * | | .              | | .              | ... | .              | |
     65  * | | .              | | .              | ... | .              | |
     66  * | | .              | | .              | ... | .              | |
     67  * | +----------------+ +----------------+     +----------------+ |
     68  * |            +--unbound-----------+                            |
     69  * |            | <dispatcher n+1>   |                            |
     70  * |            | <idle (n+1)a>      |                            |
     71  * |            | <running (n+1)b>   |                            |
     72  * |            +--------------------+                            |
     73  * +--------------------------------------------------------------+
     74  *
     75  * XXX Why one dispatcher per CPU?  I did that originally to avoid
     76  * touching remote CPUs' memory when scheduling a job, but that still
     77  * requires interprocessor synchronization.  Perhaps we could get by
     78  * with a single dispatcher thread, at the expense of another pointer
     79  * in struct threadpool_job to identify the CPU on which it must run in
     80  * order for the dispatcher to schedule it correctly.
     81  */
     82 
     83 #include <sys/cdefs.h>
     84 __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.21 2021/01/13 02:20:15 riastradh Exp $");
     85 
     86 #include <sys/types.h>
     87 #include <sys/param.h>
     88 #include <sys/atomic.h>
     89 #include <sys/condvar.h>
     90 #include <sys/cpu.h>
     91 #include <sys/kernel.h>
     92 #include <sys/kmem.h>
     93 #include <sys/kthread.h>
     94 #include <sys/mutex.h>
     95 #include <sys/once.h>
     96 #include <sys/percpu.h>
     97 #include <sys/pool.h>
     98 #include <sys/proc.h>
     99 #include <sys/queue.h>
    100 #include <sys/sdt.h>
    101 #include <sys/sysctl.h>
    102 #include <sys/systm.h>
    103 #include <sys/threadpool.h>
    104 
    105 /* Probes */
    106 
    107 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get,
    108     "pri_t"/*pri*/);
    109 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__create,
    110     "pri_t"/*pri*/);
    111 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__race,
    112     "pri_t"/*pri*/);
    113 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put,
    114     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
    115 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put__destroy,
    116     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
    117 
    118 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get,
    119     "pri_t"/*pri*/);
    120 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__create,
    121     "pri_t"/*pri*/);
    122 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__race,
    123     "pri_t"/*pri*/);
    124 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put,
    125     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
    126 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put__destroy,
    127     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
    128 
    129 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, create,
    130     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/);
    131 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__success,
    132     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "struct threadpool *"/*pool*/);
    133 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__failure,
    134     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "int"/*error*/);
    135 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, destroy,
    136     "struct threadpool *"/*pool*/);
    137 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, destroy__wait,
    138     "struct threadpool *"/*pool*/, "uint64_t"/*refcnt*/);
    139 
    140 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job,
    141     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
    142 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__running,
    143     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
    144 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__dispatcher,
    145     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
    146 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, schedule__job__thread,
    147     "struct threadpool *"/*pool*/,
    148     "struct threadpool_job *"/*job*/,
    149     "struct lwp *"/*thread*/);
    150 
    151 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__start,
    152     "struct threadpool *"/*pool*/);
    153 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__dying,
    154     "struct threadpool *"/*pool*/);
    155 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__spawn,
    156     "struct threadpool *"/*pool*/);
    157 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, dispatcher__race,
    158     "struct threadpool *"/*pool*/,
    159     "struct threadpool_job *"/*job*/);
    160 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, dispatcher__assign,
    161     "struct threadpool *"/*pool*/,
    162     "struct threadpool_job *"/*job*/,
    163     "struct lwp *"/*thread*/);
    164 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__exit,
    165     "struct threadpool *"/*pool*/);
    166 
    167 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__start,
    168     "struct threadpool *"/*pool*/);
    169 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__dying,
    170     "struct threadpool *"/*pool*/);
    171 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, thread__job,
    172     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
    173 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__exit,
    174     "struct threadpool *"/*pool*/);
    175 
    176 /* Data structures */
    177 
    178 TAILQ_HEAD(job_head, threadpool_job);
    179 TAILQ_HEAD(thread_head, threadpool_thread);
    180 
    181 struct threadpool_thread {
    182 	struct lwp			*tpt_lwp;
    183 	char				*tpt_lwp_savedname;
    184 	struct threadpool		*tpt_pool;
    185 	struct threadpool_job		*tpt_job;
    186 	kcondvar_t			tpt_cv;
    187 	TAILQ_ENTRY(threadpool_thread)	tpt_entry;
    188 };
    189 
    190 struct threadpool {
    191 	kmutex_t			tp_lock;
    192 	struct threadpool_thread	tp_dispatcher;
    193 	struct job_head			tp_jobs;
    194 	struct thread_head		tp_idle_threads;
    195 	uint64_t			tp_refcnt;
    196 	int				tp_flags;
    197 #define	THREADPOOL_DYING	0x01
    198 	struct cpu_info			*tp_cpu;
    199 	pri_t				tp_pri;
    200 };
    201 
    202 static void	threadpool_hold(struct threadpool *);
    203 static void	threadpool_rele(struct threadpool *);
    204 
    205 static int	threadpool_percpu_create(struct threadpool_percpu **, pri_t);
    206 static void	threadpool_percpu_destroy(struct threadpool_percpu *);
    207 static void	threadpool_percpu_init(void *, void *, struct cpu_info *);
    208 static void	threadpool_percpu_ok(void *, void *, struct cpu_info *);
    209 static void	threadpool_percpu_fini(void *, void *, struct cpu_info *);
    210 
    211 static threadpool_job_fn_t threadpool_job_dead;
    212 
    213 static void	threadpool_job_hold(struct threadpool_job *);
    214 static void	threadpool_job_rele(struct threadpool_job *);
    215 
    216 static void	threadpool_dispatcher_thread(void *) __dead;
    217 static void	threadpool_thread(void *) __dead;
    218 
    219 static pool_cache_t	threadpool_thread_pc __read_mostly;
    220 
    221 static kmutex_t		threadpools_lock __cacheline_aligned;
    222 
    223 	/* Default to 30 second idle timeout for pool threads. */
    224 static int	threadpool_idle_time_ms = 30 * 1000;
    225 
    226 struct threadpool_unbound {
    227 	struct threadpool		tpu_pool;
    228 
    229 	/* protected by threadpools_lock */
    230 	LIST_ENTRY(threadpool_unbound)	tpu_link;
    231 	uint64_t			tpu_refcnt;
    232 };
    233 
    234 static LIST_HEAD(, threadpool_unbound) unbound_threadpools;
    235 
    236 static struct threadpool_unbound *
    237 threadpool_lookup_unbound(pri_t pri)
    238 {
    239 	struct threadpool_unbound *tpu;
    240 
    241 	LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) {
    242 		if (tpu->tpu_pool.tp_pri == pri)
    243 			return tpu;
    244 	}
    245 	return NULL;
    246 }
    247 
    248 static void
    249 threadpool_insert_unbound(struct threadpool_unbound *tpu)
    250 {
    251 	KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL);
    252 	LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link);
    253 }
    254 
    255 static void
    256 threadpool_remove_unbound(struct threadpool_unbound *tpu)
    257 {
    258 	KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu);
    259 	LIST_REMOVE(tpu, tpu_link);
    260 }
    261 
    262 struct threadpool_percpu {
    263 	percpu_t *			tpp_percpu;
    264 	pri_t				tpp_pri;
    265 
    266 	/* protected by threadpools_lock */
    267 	LIST_ENTRY(threadpool_percpu)	tpp_link;
    268 	uint64_t			tpp_refcnt;
    269 };
    270 
    271 static LIST_HEAD(, threadpool_percpu) percpu_threadpools;
    272 
    273 static struct threadpool_percpu *
    274 threadpool_lookup_percpu(pri_t pri)
    275 {
    276 	struct threadpool_percpu *tpp;
    277 
    278 	LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) {
    279 		if (tpp->tpp_pri == pri)
    280 			return tpp;
    281 	}
    282 	return NULL;
    283 }
    284 
    285 static void
    286 threadpool_insert_percpu(struct threadpool_percpu *tpp)
    287 {
    288 	KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL);
    289 	LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link);
    290 }
    291 
    292 static void
    293 threadpool_remove_percpu(struct threadpool_percpu *tpp)
    294 {
    295 	KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp);
    296 	LIST_REMOVE(tpp, tpp_link);
    297 }
    298 
    299 static int
    300 sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS)
    301 {
    302 	struct sysctlnode node;
    303 	int val, error;
    304 
    305 	node = *rnode;
    306 
    307 	val = threadpool_idle_time_ms;
    308 	node.sysctl_data = &val;
    309 	error = sysctl_lookup(SYSCTLFN_CALL(&node));
    310 	if (error == 0 && newp != NULL) {
    311 		/* Disallow negative values and 0 (forever). */
    312 		if (val < 1)
    313 			error = EINVAL;
    314 		else
    315 			threadpool_idle_time_ms = val;
    316 	}
    317 
    318 	return error;
    319 }
    320 
    321 SYSCTL_SETUP_PROTO(sysctl_threadpool_setup);
    322 
    323 SYSCTL_SETUP(sysctl_threadpool_setup,
    324     "sysctl kern.threadpool subtree setup")
    325 {
    326 	const struct sysctlnode *rnode, *cnode;
    327 	int error __diagused;
    328 
    329 	error = sysctl_createv(clog, 0, NULL, &rnode,
    330 	    CTLFLAG_PERMANENT,
    331 	    CTLTYPE_NODE, "threadpool",
    332 	    SYSCTL_DESCR("threadpool subsystem options"),
    333 	    NULL, 0, NULL, 0,
    334 	    CTL_KERN, CTL_CREATE, CTL_EOL);
    335 	KASSERT(error == 0);
    336 
    337 	error = sysctl_createv(clog, 0, &rnode, &cnode,
    338 	    CTLFLAG_PERMANENT | CTLFLAG_READWRITE,
    339 	    CTLTYPE_INT, "idle_ms",
    340 	    SYSCTL_DESCR("idle thread timeout in ms"),
    341 	    sysctl_kern_threadpool_idle_ms, 0, NULL, 0,
    342 	    CTL_CREATE, CTL_EOL);
    343 	KASSERT(error == 0);
    344 }
    345 
    346 void
    347 threadpools_init(void)
    348 {
    349 
    350 	threadpool_thread_pc =
    351 	    pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0,
    352 		"thplthrd", NULL, IPL_NONE, NULL, NULL, NULL);
    353 
    354 	LIST_INIT(&unbound_threadpools);
    355 	LIST_INIT(&percpu_threadpools);
    356 	mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE);
    357 }
    358 
    359 static void
    360 threadnamesuffix(char *buf, size_t buflen, struct cpu_info *ci, int pri)
    361 {
    362 
    363 	buf[0] = '\0';
    364 	if (ci)
    365 		snprintf(buf + strlen(buf), buflen - strlen(buf), "/%d",
    366 		    cpu_index(ci));
    367 	if (pri != PRI_NONE)
    368 		snprintf(buf + strlen(buf), buflen - strlen(buf), "@%d", pri);
    369 }
    370 
    371 /* Thread pool creation */
    372 
    373 static bool
    374 threadpool_pri_is_valid(pri_t pri)
    375 {
    376 	return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT));
    377 }
    378 
    379 static int
    380 threadpool_create(struct threadpool *const pool, struct cpu_info *ci,
    381     pri_t pri)
    382 {
    383 	struct lwp *lwp;
    384 	char suffix[16];
    385 	int ktflags;
    386 	int error;
    387 
    388 	KASSERT(threadpool_pri_is_valid(pri));
    389 
    390 	SDT_PROBE2(sdt, kernel, threadpool, create,  ci, pri);
    391 
    392 	mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM);
    393 	/* XXX dispatcher */
    394 	TAILQ_INIT(&pool->tp_jobs);
    395 	TAILQ_INIT(&pool->tp_idle_threads);
    396 	pool->tp_refcnt = 1;		/* dispatcher's reference */
    397 	pool->tp_flags = 0;
    398 	pool->tp_cpu = ci;
    399 	pool->tp_pri = pri;
    400 
    401 	pool->tp_dispatcher.tpt_lwp = NULL;
    402 	pool->tp_dispatcher.tpt_pool = pool;
    403 	pool->tp_dispatcher.tpt_job = NULL;
    404 	cv_init(&pool->tp_dispatcher.tpt_cv, "pooldisp");
    405 
    406 	ktflags = 0;
    407 	ktflags |= KTHREAD_MPSAFE;
    408 	if (pri < PRI_KERNEL)
    409 		ktflags |= KTHREAD_TS;
    410 	threadnamesuffix(suffix, sizeof(suffix), ci, pri);
    411 	error = kthread_create(pri, ktflags, ci, &threadpool_dispatcher_thread,
    412 	    &pool->tp_dispatcher, &lwp, "pooldisp%s", suffix);
    413 	if (error)
    414 		goto fail0;
    415 
    416 	mutex_spin_enter(&pool->tp_lock);
    417 	pool->tp_dispatcher.tpt_lwp = lwp;
    418 	cv_broadcast(&pool->tp_dispatcher.tpt_cv);
    419 	mutex_spin_exit(&pool->tp_lock);
    420 
    421 	SDT_PROBE3(sdt, kernel, threadpool, create__success,  ci, pri, pool);
    422 	return 0;
    423 
    424 fail0:	KASSERT(error);
    425 	KASSERT(pool->tp_dispatcher.tpt_job == NULL);
    426 	KASSERT(pool->tp_dispatcher.tpt_pool == pool);
    427 	KASSERT(pool->tp_flags == 0);
    428 	KASSERT(pool->tp_refcnt == 0);
    429 	KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
    430 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
    431 	KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
    432 	cv_destroy(&pool->tp_dispatcher.tpt_cv);
    433 	mutex_destroy(&pool->tp_lock);
    434 	SDT_PROBE3(sdt, kernel, threadpool, create__failure,  ci, pri, error);
    435 	return error;
    436 }
    437 
    438 /* Thread pool destruction */
    439 
    440 static void
    441 threadpool_destroy(struct threadpool *pool)
    442 {
    443 	struct threadpool_thread *thread;
    444 
    445 	SDT_PROBE1(sdt, kernel, threadpool, destroy,  pool);
    446 
    447 	/* Mark the pool dying and wait for threads to commit suicide.  */
    448 	mutex_spin_enter(&pool->tp_lock);
    449 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
    450 	pool->tp_flags |= THREADPOOL_DYING;
    451 	cv_broadcast(&pool->tp_dispatcher.tpt_cv);
    452 	TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry)
    453 		cv_broadcast(&thread->tpt_cv);
    454 	while (0 < pool->tp_refcnt) {
    455 		SDT_PROBE2(sdt, kernel, threadpool, destroy__wait,
    456 		    pool, pool->tp_refcnt);
    457 		cv_wait(&pool->tp_dispatcher.tpt_cv, &pool->tp_lock);
    458 	}
    459 	mutex_spin_exit(&pool->tp_lock);
    460 
    461 	KASSERT(pool->tp_dispatcher.tpt_job == NULL);
    462 	KASSERT(pool->tp_dispatcher.tpt_pool == pool);
    463 	KASSERT(pool->tp_flags == THREADPOOL_DYING);
    464 	KASSERT(pool->tp_refcnt == 0);
    465 	KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
    466 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
    467 	KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
    468 	cv_destroy(&pool->tp_dispatcher.tpt_cv);
    469 	mutex_destroy(&pool->tp_lock);
    470 }
    471 
    472 static void
    473 threadpool_hold(struct threadpool *pool)
    474 {
    475 
    476 	KASSERT(mutex_owned(&pool->tp_lock));
    477 	pool->tp_refcnt++;
    478 	KASSERT(pool->tp_refcnt != 0);
    479 }
    480 
    481 static void
    482 threadpool_rele(struct threadpool *pool)
    483 {
    484 
    485 	KASSERT(mutex_owned(&pool->tp_lock));
    486 	KASSERT(0 < pool->tp_refcnt);
    487 	if (--pool->tp_refcnt == 0)
    488 		cv_broadcast(&pool->tp_dispatcher.tpt_cv);
    489 }
    490 
    491 /* Unbound thread pools */
    492 
    493 int
    494 threadpool_get(struct threadpool **poolp, pri_t pri)
    495 {
    496 	struct threadpool_unbound *tpu, *tmp = NULL;
    497 	int error;
    498 
    499 	ASSERT_SLEEPABLE();
    500 
    501 	SDT_PROBE1(sdt, kernel, threadpool, get,  pri);
    502 
    503 	if (! threadpool_pri_is_valid(pri))
    504 		return EINVAL;
    505 
    506 	mutex_enter(&threadpools_lock);
    507 	tpu = threadpool_lookup_unbound(pri);
    508 	if (tpu == NULL) {
    509 		mutex_exit(&threadpools_lock);
    510 		SDT_PROBE1(sdt, kernel, threadpool, get__create,  pri);
    511 		tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP);
    512 		error = threadpool_create(&tmp->tpu_pool, NULL, pri);
    513 		if (error) {
    514 			kmem_free(tmp, sizeof(*tmp));
    515 			return error;
    516 		}
    517 		mutex_enter(&threadpools_lock);
    518 		tpu = threadpool_lookup_unbound(pri);
    519 		if (tpu == NULL) {
    520 			tpu = tmp;
    521 			tmp = NULL;
    522 			threadpool_insert_unbound(tpu);
    523 		} else {
    524 			SDT_PROBE1(sdt, kernel, threadpool, get__race,  pri);
    525 		}
    526 	}
    527 	KASSERT(tpu != NULL);
    528 	tpu->tpu_refcnt++;
    529 	KASSERT(tpu->tpu_refcnt != 0);
    530 	mutex_exit(&threadpools_lock);
    531 
    532 	if (tmp != NULL) {
    533 		threadpool_destroy(&tmp->tpu_pool);
    534 		kmem_free(tmp, sizeof(*tmp));
    535 	}
    536 	KASSERT(tpu != NULL);
    537 	*poolp = &tpu->tpu_pool;
    538 	return 0;
    539 }
    540 
    541 void
    542 threadpool_put(struct threadpool *pool, pri_t pri)
    543 {
    544 	struct threadpool_unbound *tpu =
    545 	    container_of(pool, struct threadpool_unbound, tpu_pool);
    546 
    547 	ASSERT_SLEEPABLE();
    548 	KASSERT(threadpool_pri_is_valid(pri));
    549 
    550 	SDT_PROBE2(sdt, kernel, threadpool, put,  pool, pri);
    551 
    552 	mutex_enter(&threadpools_lock);
    553 	KASSERT(tpu == threadpool_lookup_unbound(pri));
    554 	KASSERT(0 < tpu->tpu_refcnt);
    555 	if (--tpu->tpu_refcnt == 0) {
    556 		SDT_PROBE2(sdt, kernel, threadpool, put__destroy,  pool, pri);
    557 		threadpool_remove_unbound(tpu);
    558 	} else {
    559 		tpu = NULL;
    560 	}
    561 	mutex_exit(&threadpools_lock);
    562 
    563 	if (tpu) {
    564 		threadpool_destroy(&tpu->tpu_pool);
    565 		kmem_free(tpu, sizeof(*tpu));
    566 	}
    567 }
    568 
    569 /* Per-CPU thread pools */
    570 
    571 int
    572 threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri)
    573 {
    574 	struct threadpool_percpu *pool_percpu, *tmp = NULL;
    575 	int error;
    576 
    577 	ASSERT_SLEEPABLE();
    578 
    579 	SDT_PROBE1(sdt, kernel, threadpool, percpu__get,  pri);
    580 
    581 	if (! threadpool_pri_is_valid(pri))
    582 		return EINVAL;
    583 
    584 	mutex_enter(&threadpools_lock);
    585 	pool_percpu = threadpool_lookup_percpu(pri);
    586 	if (pool_percpu == NULL) {
    587 		mutex_exit(&threadpools_lock);
    588 		SDT_PROBE1(sdt, kernel, threadpool, percpu__get__create,  pri);
    589 		error = threadpool_percpu_create(&tmp, pri);
    590 		if (error)
    591 			return error;
    592 		KASSERT(tmp != NULL);
    593 		mutex_enter(&threadpools_lock);
    594 		pool_percpu = threadpool_lookup_percpu(pri);
    595 		if (pool_percpu == NULL) {
    596 			pool_percpu = tmp;
    597 			tmp = NULL;
    598 			threadpool_insert_percpu(pool_percpu);
    599 		} else {
    600 			SDT_PROBE1(sdt, kernel, threadpool, percpu__get__race,
    601 			    pri);
    602 		}
    603 	}
    604 	KASSERT(pool_percpu != NULL);
    605 	pool_percpu->tpp_refcnt++;
    606 	KASSERT(pool_percpu->tpp_refcnt != 0);
    607 	mutex_exit(&threadpools_lock);
    608 
    609 	if (tmp != NULL)
    610 		threadpool_percpu_destroy(tmp);
    611 	KASSERT(pool_percpu != NULL);
    612 	*pool_percpup = pool_percpu;
    613 	return 0;
    614 }
    615 
    616 void
    617 threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri)
    618 {
    619 
    620 	ASSERT_SLEEPABLE();
    621 
    622 	KASSERT(threadpool_pri_is_valid(pri));
    623 
    624 	SDT_PROBE2(sdt, kernel, threadpool, percpu__put,  pool_percpu, pri);
    625 
    626 	mutex_enter(&threadpools_lock);
    627 	KASSERT(pool_percpu == threadpool_lookup_percpu(pri));
    628 	KASSERT(0 < pool_percpu->tpp_refcnt);
    629 	if (--pool_percpu->tpp_refcnt == 0) {
    630 		SDT_PROBE2(sdt, kernel, threadpool, percpu__put__destroy,
    631 		    pool_percpu, pri);
    632 		threadpool_remove_percpu(pool_percpu);
    633 	} else {
    634 		pool_percpu = NULL;
    635 	}
    636 	mutex_exit(&threadpools_lock);
    637 
    638 	if (pool_percpu)
    639 		threadpool_percpu_destroy(pool_percpu);
    640 }
    641 
    642 struct threadpool *
    643 threadpool_percpu_ref(struct threadpool_percpu *pool_percpu)
    644 {
    645 	struct threadpool **poolp, *pool;
    646 
    647 	poolp = percpu_getref(pool_percpu->tpp_percpu);
    648 	pool = *poolp;
    649 	percpu_putref(pool_percpu->tpp_percpu);
    650 
    651 	return pool;
    652 }
    653 
    654 struct threadpool *
    655 threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu,
    656     struct cpu_info *ci)
    657 {
    658 	struct threadpool **poolp, *pool;
    659 
    660 	/*
    661 	 * As long as xcalls are blocked -- e.g., by kpreempt_disable
    662 	 * -- the percpu object will not be swapped and destroyed.  We
    663 	 * can't write to it, because the data may have already been
    664 	 * moved to a new buffer, but we can safely read from it.
    665 	 */
    666 	kpreempt_disable();
    667 	poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
    668 	pool = *poolp;
    669 	kpreempt_enable();
    670 
    671 	return pool;
    672 }
    673 
    674 static int
    675 threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri)
    676 {
    677 	struct threadpool_percpu *pool_percpu;
    678 	bool ok = true;
    679 
    680 	pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP);
    681 	pool_percpu->tpp_pri = pri;
    682 	pool_percpu->tpp_percpu = percpu_create(sizeof(struct threadpool *),
    683 	    threadpool_percpu_init, threadpool_percpu_fini,
    684 	    (void *)(intptr_t)pri);
    685 
    686 	/*
    687 	 * Verify that all of the CPUs were initialized.
    688 	 *
    689 	 * XXX What to do if we add CPU hotplug?
    690 	 */
    691 	percpu_foreach(pool_percpu->tpp_percpu, &threadpool_percpu_ok, &ok);
    692 	if (!ok)
    693 		goto fail;
    694 
    695 	/* Success!  */
    696 	*pool_percpup = (struct threadpool_percpu *)pool_percpu;
    697 	return 0;
    698 
    699 fail:	percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
    700 	kmem_free(pool_percpu, sizeof(*pool_percpu));
    701 	return ENOMEM;
    702 }
    703 
    704 static void
    705 threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu)
    706 {
    707 
    708 	percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
    709 	kmem_free(pool_percpu, sizeof(*pool_percpu));
    710 }
    711 
    712 static void
    713 threadpool_percpu_init(void *vpoolp, void *vpri, struct cpu_info *ci)
    714 {
    715 	struct threadpool **const poolp = vpoolp;
    716 	pri_t pri = (intptr_t)(void *)vpri;
    717 	int error;
    718 
    719 	*poolp = kmem_zalloc(sizeof(**poolp), KM_SLEEP);
    720 	error = threadpool_create(*poolp, ci, pri);
    721 	if (error) {
    722 		KASSERT(error == ENOMEM);
    723 		kmem_free(*poolp, sizeof(**poolp));
    724 		*poolp = NULL;
    725 	}
    726 }
    727 
    728 static void
    729 threadpool_percpu_ok(void *vpoolp, void *vokp, struct cpu_info *ci)
    730 {
    731 	struct threadpool **const poolp = vpoolp;
    732 	bool *okp = vokp;
    733 
    734 	if (*poolp == NULL)
    735 		atomic_store_relaxed(okp, false);
    736 }
    737 
    738 static void
    739 threadpool_percpu_fini(void *vpoolp, void *vprip, struct cpu_info *ci)
    740 {
    741 	struct threadpool **const poolp = vpoolp;
    742 
    743 	if (*poolp == NULL)	/* initialization failed */
    744 		return;
    745 	threadpool_destroy(*poolp);
    746 	kmem_free(*poolp, sizeof(**poolp));
    747 }
    748 
    749 /* Thread pool jobs */
    750 
    751 void __printflike(4,5)
    752 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn,
    753     kmutex_t *lock, const char *fmt, ...)
    754 {
    755 	va_list ap;
    756 
    757 	va_start(ap, fmt);
    758 	(void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap);
    759 	va_end(ap);
    760 
    761 	job->job_lock = lock;
    762 	job->job_thread = NULL;
    763 	job->job_refcnt = 0;
    764 	cv_init(&job->job_cv, job->job_name);
    765 	job->job_fn = fn;
    766 }
    767 
    768 static void
    769 threadpool_job_dead(struct threadpool_job *job)
    770 {
    771 
    772 	panic("threadpool job %p ran after destruction", job);
    773 }
    774 
    775 void
    776 threadpool_job_destroy(struct threadpool_job *job)
    777 {
    778 
    779 	ASSERT_SLEEPABLE();
    780 
    781 	KASSERTMSG((job->job_thread == NULL), "job %p still running", job);
    782 
    783 	mutex_enter(job->job_lock);
    784 	while (0 < atomic_load_relaxed(&job->job_refcnt))
    785 		cv_wait(&job->job_cv, job->job_lock);
    786 	mutex_exit(job->job_lock);
    787 
    788 	job->job_lock = NULL;
    789 	KASSERT(job->job_thread == NULL);
    790 	KASSERT(job->job_refcnt == 0);
    791 	KASSERT(!cv_has_waiters(&job->job_cv));
    792 	cv_destroy(&job->job_cv);
    793 	job->job_fn = threadpool_job_dead;
    794 	(void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
    795 }
    796 
    797 static void
    798 threadpool_job_hold(struct threadpool_job *job)
    799 {
    800 	unsigned int refcnt __diagused;
    801 
    802 	refcnt = atomic_inc_uint_nv(&job->job_refcnt);
    803 	KASSERT(refcnt != 0);
    804 }
    805 
    806 static void
    807 threadpool_job_rele(struct threadpool_job *job)
    808 {
    809 	unsigned int refcnt;
    810 
    811 	KASSERT(mutex_owned(job->job_lock));
    812 
    813 	refcnt = atomic_dec_uint_nv(&job->job_refcnt);
    814 	KASSERT(refcnt != UINT_MAX);
    815 	if (refcnt == 0)
    816 		cv_broadcast(&job->job_cv);
    817 }
    818 
    819 void
    820 threadpool_job_done(struct threadpool_job *job)
    821 {
    822 
    823 	KASSERT(mutex_owned(job->job_lock));
    824 	KASSERT(job->job_thread != NULL);
    825 	KASSERT(job->job_thread->tpt_lwp == curlwp);
    826 
    827 	/*
    828 	 * We can safely read this field; it's only modified right before
    829 	 * we call the job work function, and we are only preserving it
    830 	 * to use here; no one cares if it contains junk afterward.
    831 	 */
    832 	lwp_lock(curlwp);
    833 	curlwp->l_name = job->job_thread->tpt_lwp_savedname;
    834 	lwp_unlock(curlwp);
    835 
    836 	/*
    837 	 * Inline the work of threadpool_job_rele(); the job is already
    838 	 * locked, the most likely scenario (XXXJRT only scenario?) is
    839 	 * that we're dropping the last reference (the one taken in
    840 	 * threadpool_schedule_job()), and we always do the cv_broadcast()
    841 	 * anyway.
    842 	 */
    843 	KASSERT(0 < atomic_load_relaxed(&job->job_refcnt));
    844 	unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt);
    845 	KASSERT(refcnt != UINT_MAX);
    846 	cv_broadcast(&job->job_cv);
    847 	job->job_thread = NULL;
    848 }
    849 
    850 void
    851 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
    852 {
    853 
    854 	KASSERT(mutex_owned(job->job_lock));
    855 
    856 	SDT_PROBE2(sdt, kernel, threadpool, schedule__job,  pool, job);
    857 
    858 	/*
    859 	 * If the job's already running, let it keep running.  The job
    860 	 * is guaranteed by the interlock not to end early -- if it had
    861 	 * ended early, threadpool_job_done would have set job_thread
    862 	 * to NULL under the interlock.
    863 	 */
    864 	if (__predict_true(job->job_thread != NULL)) {
    865 		SDT_PROBE2(sdt, kernel, threadpool, schedule__job__running,
    866 		    pool, job);
    867 		return;
    868 	}
    869 
    870 	threadpool_job_hold(job);
    871 
    872 	/* Otherwise, try to assign a thread to the job.  */
    873 	mutex_spin_enter(&pool->tp_lock);
    874 	if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
    875 		/* Nobody's idle.  Give it to the dispatcher.  */
    876 		SDT_PROBE2(sdt, kernel, threadpool, schedule__job__dispatcher,
    877 		    pool, job);
    878 		job->job_thread = &pool->tp_dispatcher;
    879 		TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
    880 	} else {
    881 		/* Assign it to the first idle thread.  */
    882 		job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
    883 		SDT_PROBE3(sdt, kernel, threadpool, schedule__job__thread,
    884 		    pool, job, job->job_thread->tpt_lwp);
    885 		TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
    886 		    tpt_entry);
    887 		job->job_thread->tpt_job = job;
    888 	}
    889 
    890 	/* Notify whomever we gave it to, dispatcher or idle thread.  */
    891 	KASSERT(job->job_thread != NULL);
    892 	cv_broadcast(&job->job_thread->tpt_cv);
    893 	mutex_spin_exit(&pool->tp_lock);
    894 }
    895 
    896 bool
    897 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
    898 {
    899 
    900 	KASSERT(mutex_owned(job->job_lock));
    901 
    902 	/*
    903 	 * XXXJRT This fails (albeit safely) when all of the following
    904 	 * are true:
    905 	 *
    906 	 *	=> "pool" is something other than what the job was
    907 	 *	   scheduled on.  This can legitimately occur if,
    908 	 *	   for example, a job is percpu-scheduled on CPU0
    909 	 *	   and then CPU1 attempts to cancel it without taking
    910 	 *	   a remote pool reference.  (this might happen by
    911 	 *	   "luck of the draw").
    912 	 *
    913 	 *	=> "job" is not yet running, but is assigned to the
    914 	 *	   dispatcher.
    915 	 *
    916 	 * When this happens, this code makes the determination that
    917 	 * the job is already running.  The failure mode is that the
    918 	 * caller is told the job is running, and thus has to wait.
    919 	 * The dispatcher will eventually get to it and the job will
    920 	 * proceed as if it had been already running.
    921 	 */
    922 
    923 	if (job->job_thread == NULL) {
    924 		/* Nothing to do.  Guaranteed not running.  */
    925 		return true;
    926 	} else if (job->job_thread == &pool->tp_dispatcher) {
    927 		/* Take it off the list to guarantee it won't run.  */
    928 		job->job_thread = NULL;
    929 		mutex_spin_enter(&pool->tp_lock);
    930 		TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
    931 		mutex_spin_exit(&pool->tp_lock);
    932 		threadpool_job_rele(job);
    933 		return true;
    934 	} else {
    935 		/* Too late -- already running.  */
    936 		return false;
    937 	}
    938 }
    939 
    940 void
    941 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
    942 {
    943 
    944 	/*
    945 	 * We may sleep here, but we can't ASSERT_SLEEPABLE() because
    946 	 * the job lock (used to interlock the cv_wait()) may in fact
    947 	 * legitimately be a spin lock, so the assertion would fire
    948 	 * as a false-positive.
    949 	 */
    950 
    951 	KASSERT(mutex_owned(job->job_lock));
    952 
    953 	if (threadpool_cancel_job_async(pool, job))
    954 		return;
    955 
    956 	/* Already running.  Wait for it to complete.  */
    957 	while (job->job_thread != NULL)
    958 		cv_wait(&job->job_cv, job->job_lock);
    959 }
    960 
    961 /* Thread pool dispatcher thread */
    962 
    963 static void __dead
    964 threadpool_dispatcher_thread(void *arg)
    965 {
    966 	struct threadpool_thread *const dispatcher = arg;
    967 	struct threadpool *const pool = dispatcher->tpt_pool;
    968 	struct lwp *lwp = NULL;
    969 	int ktflags;
    970 	char suffix[16];
    971 	int error;
    972 
    973 	KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
    974 	KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND));
    975 
    976 	/* Wait until we're initialized.  */
    977 	mutex_spin_enter(&pool->tp_lock);
    978 	while (dispatcher->tpt_lwp == NULL)
    979 		cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
    980 
    981 	SDT_PROBE1(sdt, kernel, threadpool, dispatcher__start,  pool);
    982 
    983 	for (;;) {
    984 		/* Wait until there's a job.  */
    985 		while (TAILQ_EMPTY(&pool->tp_jobs)) {
    986 			if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
    987 				SDT_PROBE1(sdt, kernel, threadpool,
    988 				    dispatcher__dying,  pool);
    989 				break;
    990 			}
    991 			cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
    992 		}
    993 		if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
    994 			break;
    995 
    996 		/* If there are no threads, we'll have to try to start one.  */
    997 		if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
    998 			SDT_PROBE1(sdt, kernel, threadpool, dispatcher__spawn,
    999 			    pool);
   1000 			threadpool_hold(pool);
   1001 			mutex_spin_exit(&pool->tp_lock);
   1002 
   1003 			struct threadpool_thread *const thread =
   1004 			    pool_cache_get(threadpool_thread_pc, PR_WAITOK);
   1005 			thread->tpt_lwp = NULL;
   1006 			thread->tpt_pool = pool;
   1007 			thread->tpt_job = NULL;
   1008 			cv_init(&thread->tpt_cv, "pooljob");
   1009 
   1010 			ktflags = 0;
   1011 			ktflags |= KTHREAD_MPSAFE;
   1012 			if (pool->tp_pri < PRI_KERNEL)
   1013 				ktflags |= KTHREAD_TS;
   1014 			threadnamesuffix(suffix, sizeof(suffix), pool->tp_cpu,
   1015 			    pool->tp_pri);
   1016 			error = kthread_create(pool->tp_pri, ktflags,
   1017 			    pool->tp_cpu, &threadpool_thread, thread, &lwp,
   1018 			    "poolthread%s", suffix);
   1019 
   1020 			mutex_spin_enter(&pool->tp_lock);
   1021 			if (error) {
   1022 				pool_cache_put(threadpool_thread_pc, thread);
   1023 				threadpool_rele(pool);
   1024 				/* XXX What to do to wait for memory?  */
   1025 				(void)kpause("thrdplcr", false, hz,
   1026 				    &pool->tp_lock);
   1027 				continue;
   1028 			}
   1029 			/*
   1030 			 * New kthread now owns the reference to the pool
   1031 			 * taken above.
   1032 			 */
   1033 			KASSERT(lwp != NULL);
   1034 			TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread,
   1035 			    tpt_entry);
   1036 			thread->tpt_lwp = lwp;
   1037 			lwp = NULL;
   1038 			cv_broadcast(&thread->tpt_cv);
   1039 			continue;
   1040 		}
   1041 
   1042 		/* There are idle threads, so try giving one a job.  */
   1043 		struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs);
   1044 		TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
   1045 		/*
   1046 		 * Take an extra reference on the job temporarily so that
   1047 		 * it won't disappear on us while we have both locks dropped.
   1048 		 */
   1049 		threadpool_job_hold(job);
   1050 		mutex_spin_exit(&pool->tp_lock);
   1051 
   1052 		mutex_enter(job->job_lock);
   1053 		/* If the job was cancelled, we'll no longer be its thread.  */
   1054 		if (__predict_true(job->job_thread == dispatcher)) {
   1055 			mutex_spin_enter(&pool->tp_lock);
   1056 			if (__predict_false(
   1057 				    TAILQ_EMPTY(&pool->tp_idle_threads))) {
   1058 				/*
   1059 				 * Someone else snagged the thread
   1060 				 * first.  We'll have to try again.
   1061 				 */
   1062 				SDT_PROBE2(sdt, kernel, threadpool,
   1063 				    dispatcher__race,  pool, job);
   1064 				TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
   1065 				    job_entry);
   1066 			} else {
   1067 				/*
   1068 				 * Assign the job to the thread and
   1069 				 * wake the thread so it starts work.
   1070 				 */
   1071 				struct threadpool_thread *const thread =
   1072 				    TAILQ_FIRST(&pool->tp_idle_threads);
   1073 
   1074 				SDT_PROBE2(sdt, kernel, threadpool,
   1075 				    dispatcher__assign,  job, thread->tpt_lwp);
   1076 				KASSERT(thread->tpt_job == NULL);
   1077 				TAILQ_REMOVE(&pool->tp_idle_threads, thread,
   1078 				    tpt_entry);
   1079 				thread->tpt_job = job;
   1080 				job->job_thread = thread;
   1081 				cv_broadcast(&thread->tpt_cv);
   1082 			}
   1083 			mutex_spin_exit(&pool->tp_lock);
   1084 		}
   1085 		threadpool_job_rele(job);
   1086 		mutex_exit(job->job_lock);
   1087 
   1088 		mutex_spin_enter(&pool->tp_lock);
   1089 	}
   1090 	threadpool_rele(pool);
   1091 	mutex_spin_exit(&pool->tp_lock);
   1092 
   1093 	SDT_PROBE1(sdt, kernel, threadpool, dispatcher__exit,  pool);
   1094 
   1095 	kthread_exit(0);
   1096 }
   1097 
   1098 /* Thread pool thread */
   1099 
   1100 static void __dead
   1101 threadpool_thread(void *arg)
   1102 {
   1103 	struct threadpool_thread *const thread = arg;
   1104 	struct threadpool *const pool = thread->tpt_pool;
   1105 
   1106 	KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
   1107 	KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND));
   1108 
   1109 	/* Wait until we're initialized and on the queue.  */
   1110 	mutex_spin_enter(&pool->tp_lock);
   1111 	while (thread->tpt_lwp == NULL)
   1112 		cv_wait(&thread->tpt_cv, &pool->tp_lock);
   1113 
   1114 	SDT_PROBE1(sdt, kernel, threadpool, thread__start,  pool);
   1115 
   1116 	KASSERT(thread->tpt_lwp == curlwp);
   1117 	for (;;) {
   1118 		/* Wait until we are assigned a job.  */
   1119 		while (thread->tpt_job == NULL) {
   1120 			if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
   1121 				SDT_PROBE1(sdt, kernel, threadpool,
   1122 				    thread__dying,  pool);
   1123 				break;
   1124 			}
   1125 			if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
   1126 				mstohz(threadpool_idle_time_ms)))
   1127 				break;
   1128 		}
   1129 		if (__predict_false(thread->tpt_job == NULL)) {
   1130 			TAILQ_REMOVE(&pool->tp_idle_threads, thread,
   1131 			    tpt_entry);
   1132 			break;
   1133 		}
   1134 
   1135 		struct threadpool_job *const job = thread->tpt_job;
   1136 		KASSERT(job != NULL);
   1137 
   1138 		/* Set our lwp name to reflect what job we're doing.  */
   1139 		lwp_lock(curlwp);
   1140 		char *const lwp_name __diagused = curlwp->l_name;
   1141 		thread->tpt_lwp_savedname = curlwp->l_name;
   1142 		curlwp->l_name = job->job_name;
   1143 		lwp_unlock(curlwp);
   1144 
   1145 		mutex_spin_exit(&pool->tp_lock);
   1146 
   1147 		SDT_PROBE2(sdt, kernel, threadpool, thread__job,  pool, job);
   1148 
   1149 		/* Run the job.  */
   1150 		(*job->job_fn)(job);
   1151 
   1152 		/* lwp name restored in threadpool_job_done(). */
   1153 		KASSERTMSG((curlwp->l_name == lwp_name),
   1154 		    "someone forgot to call threadpool_job_done()!");
   1155 
   1156 		/*
   1157 		 * We can compare pointers, but we can no longer deference
   1158 		 * job after this because threadpool_job_done() drops the
   1159 		 * last reference on the job while the job is locked.
   1160 		 */
   1161 
   1162 		mutex_spin_enter(&pool->tp_lock);
   1163 		KASSERT(thread->tpt_job == job);
   1164 		thread->tpt_job = NULL;
   1165 		TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry);
   1166 	}
   1167 	threadpool_rele(pool);
   1168 	mutex_spin_exit(&pool->tp_lock);
   1169 
   1170 	SDT_PROBE1(sdt, kernel, threadpool, thread__exit,  pool);
   1171 
   1172 	KASSERT(!cv_has_waiters(&thread->tpt_cv));
   1173 	cv_destroy(&thread->tpt_cv);
   1174 	pool_cache_put(threadpool_thread_pc, thread);
   1175 	kthread_exit(0);
   1176 }
   1177