kern_threadpool.c revision 1.6 1 /* $NetBSD: kern_threadpool.c,v 1.6 2018/12/26 20:30:36 thorpej Exp $ */
2
3 /*-
4 * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Taylor R. Campbell and Jason R. Thorpe.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 /*
33 * Thread pools.
34 *
35 * A thread pool is a collection of worker threads idle or running
36 * jobs, together with an overseer thread that does not run jobs but
37 * can be given jobs to assign to a worker thread. Scheduling a job in
38 * a thread pool does not allocate or even sleep at all, except perhaps
39 * on an adaptive lock, unlike kthread_create. Jobs reuse threads, so
40 * they do not incur the expense of creating and destroying kthreads
41 * unless there is not much work to be done.
42 *
43 * A per-CPU thread pool (threadpool_percpu) is a collection of thread
44 * pools, one per CPU bound to that CPU. For each priority level in
45 * use, there is one shared unbound thread pool (i.e., pool of threads
46 * not bound to any CPU) and one shared per-CPU thread pool.
47 *
48 * To use the unbound thread pool at priority pri, call
49 * threadpool_get(&pool, pri). When you're done, call
50 * threadpool_put(pool, pri).
51 *
52 * To use the per-CPU thread pools at priority pri, call
53 * threadpool_percpu_get(&pool_percpu, pri), and then use the thread
54 * pool returned by threadpool_percpu_ref(pool_percpu) for the current
55 * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another
56 * CPU. When you're done, call threadpool_percpu_put(pool_percpu,
57 * pri).
58 *
59 * +--MACHINE-----------------------------------------------+
60 * | +--CPU 0-------+ +--CPU 1-------+ +--CPU n-------+ |
61 * | | <overseer 0> | | <overseer 1> | ... | <overseer n> | |
62 * | | <idle 0a> | | <running 1a> | ... | <idle na> | |
63 * | | <running 0b> | | <running 1b> | ... | <idle nb> | |
64 * | | . | | . | ... | . | |
65 * | | . | | . | ... | . | |
66 * | | . | | . | ... | . | |
67 * | +--------------+ +--------------+ +--------------+ |
68 * | +--unbound---------+ |
69 * | | <overseer n+1> | |
70 * | | <idle (n+1)a> | |
71 * | | <running (n+1)b> | |
72 * | +------------------+ |
73 * +--------------------------------------------------------+
74 *
75 * XXX Why one overseer per CPU? I did that originally to avoid
76 * touching remote CPUs' memory when scheduling a job, but that still
77 * requires interprocessor synchronization. Perhaps we could get by
78 * with a single overseer thread, at the expense of another pointer in
79 * struct threadpool_job to identify the CPU on which it must run
80 * in order for the overseer to schedule it correctly.
81 */
82
83 #include <sys/cdefs.h>
84 __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.6 2018/12/26 20:30:36 thorpej Exp $");
85
86 #include <sys/types.h>
87 #include <sys/param.h>
88 #include <sys/atomic.h>
89 #include <sys/condvar.h>
90 #include <sys/cpu.h>
91 #include <sys/kernel.h>
92 #include <sys/kmem.h>
93 #include <sys/kthread.h>
94 #include <sys/mutex.h>
95 #include <sys/once.h>
96 #include <sys/percpu.h>
97 #include <sys/pool.h>
98 #include <sys/proc.h>
99 #include <sys/queue.h>
100 #include <sys/systm.h>
101 #include <sys/threadpool.h>
102
103 static ONCE_DECL(threadpool_init_once)
104
105 #define THREADPOOL_INIT() \
106 do { \
107 int threadpool_init_error __diagused = \
108 RUN_ONCE(&threadpool_init_once, threadpools_init); \
109 KASSERT(threadpool_init_error == 0); \
110 } while (/*CONSTCOND*/0)
111
112 /* Data structures */
113
114 TAILQ_HEAD(job_head, threadpool_job);
115 TAILQ_HEAD(thread_head, threadpool_thread);
116
117 struct threadpool_thread {
118 struct lwp *tpt_lwp;
119 struct threadpool *tpt_pool;
120 struct threadpool_job *tpt_job;
121 kcondvar_t tpt_cv;
122 TAILQ_ENTRY(threadpool_thread) tpt_entry;
123 };
124
125 struct threadpool {
126 kmutex_t tp_lock;
127 struct threadpool_thread tp_overseer;
128 struct job_head tp_jobs;
129 struct thread_head tp_idle_threads;
130 unsigned int tp_refcnt;
131 int tp_flags;
132 #define THREADPOOL_DYING 0x01
133 struct cpu_info *tp_cpu;
134 pri_t tp_pri;
135 };
136
137 static int threadpool_hold(struct threadpool *);
138 static void threadpool_rele(struct threadpool *);
139
140 static int threadpool_percpu_create(struct threadpool_percpu **, pri_t);
141 static void threadpool_percpu_destroy(struct threadpool_percpu *);
142
143 static void threadpool_job_dead(struct threadpool_job *);
144
145 static int threadpool_job_hold(struct threadpool_job *);
146 static void threadpool_job_rele(struct threadpool_job *);
147
148 static void threadpool_overseer_thread(void *) __dead;
149 static void threadpool_thread(void *) __dead;
150
151 static pool_cache_t threadpool_thread_pc __read_mostly;
152
153 static kmutex_t threadpools_lock __cacheline_aligned;
154
155 /* Idle out threads after 30 seconds */
156 #define THREADPOOL_IDLE_TICKS mstohz(30 * 1000)
157
158 struct threadpool_unbound {
159 struct threadpool tpu_pool;
160
161 /* protected by threadpools_lock */
162 LIST_ENTRY(threadpool_unbound) tpu_link;
163 uint64_t tpu_refcnt;
164 };
165
166 static LIST_HEAD(, threadpool_unbound) unbound_threadpools;
167
168 static struct threadpool_unbound *
169 threadpool_lookup_unbound(pri_t pri)
170 {
171 struct threadpool_unbound *tpu;
172
173 LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) {
174 if (tpu->tpu_pool.tp_pri == pri)
175 return tpu;
176 }
177 return NULL;
178 }
179
180 static void
181 threadpool_insert_unbound(struct threadpool_unbound *tpu)
182 {
183 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL);
184 LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link);
185 }
186
187 static void
188 threadpool_remove_unbound(struct threadpool_unbound *tpu)
189 {
190 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu);
191 LIST_REMOVE(tpu, tpu_link);
192 }
193
194 struct threadpool_percpu {
195 percpu_t * tpp_percpu;
196 pri_t tpp_pri;
197
198 /* protected by threadpools_lock */
199 LIST_ENTRY(threadpool_percpu) tpp_link;
200 uint64_t tpp_refcnt;
201 };
202
203 static LIST_HEAD(, threadpool_percpu) percpu_threadpools;
204
205 static struct threadpool_percpu *
206 threadpool_lookup_percpu(pri_t pri)
207 {
208 struct threadpool_percpu *tpp;
209
210 LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) {
211 if (tpp->tpp_pri == pri)
212 return tpp;
213 }
214 return NULL;
215 }
216
217 static void
218 threadpool_insert_percpu(struct threadpool_percpu *tpp)
219 {
220 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL);
221 LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link);
222 }
223
224 static void
225 threadpool_remove_percpu(struct threadpool_percpu *tpp)
226 {
227 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp);
228 LIST_REMOVE(tpp, tpp_link);
229 }
230
231 #ifdef THREADPOOL_VERBOSE
232 #define TP_LOG(x) printf x
233 #else
234 #define TP_LOG(x) /* nothing */
235 #endif /* THREADPOOL_VERBOSE */
236
237 static int
238 threadpools_init(void)
239 {
240
241 threadpool_thread_pc =
242 pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0,
243 "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL);
244
245 LIST_INIT(&unbound_threadpools);
246 LIST_INIT(&percpu_threadpools);
247 mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE);
248
249 TP_LOG(("%s: sizeof(threadpool_job) = %zu\n",
250 __func__, sizeof(struct threadpool_job)));
251
252 return 0;
253 }
254
255 /* Thread pool creation */
256
257 static bool
258 threadpool_pri_is_valid(pri_t pri)
259 {
260 return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT));
261 }
262
263 static int
264 threadpool_create(struct threadpool *const pool, struct cpu_info *ci,
265 pri_t pri)
266 {
267 struct lwp *lwp;
268 int ktflags;
269 int error;
270
271 KASSERT(threadpool_pri_is_valid(pri));
272
273 mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM);
274 /* XXX overseer */
275 TAILQ_INIT(&pool->tp_jobs);
276 TAILQ_INIT(&pool->tp_idle_threads);
277 pool->tp_refcnt = 0;
278 pool->tp_flags = 0;
279 pool->tp_cpu = ci;
280 pool->tp_pri = pri;
281
282 error = threadpool_hold(pool);
283 KASSERT(error == 0);
284 pool->tp_overseer.tpt_lwp = NULL;
285 pool->tp_overseer.tpt_pool = pool;
286 pool->tp_overseer.tpt_job = NULL;
287 cv_init(&pool->tp_overseer.tpt_cv, "poolover");
288
289 ktflags = 0;
290 ktflags |= KTHREAD_MPSAFE;
291 if (pri < PRI_KERNEL)
292 ktflags |= KTHREAD_TS;
293 error = kthread_create(pri, ktflags, ci, &threadpool_overseer_thread,
294 &pool->tp_overseer, &lwp,
295 "pooloverseer/%d@%d", (ci ? cpu_index(ci) : -1), (int)pri);
296 if (error)
297 goto fail0;
298
299 mutex_spin_enter(&pool->tp_lock);
300 pool->tp_overseer.tpt_lwp = lwp;
301 cv_broadcast(&pool->tp_overseer.tpt_cv);
302 mutex_spin_exit(&pool->tp_lock);
303
304 return 0;
305
306 fail0: KASSERT(error);
307 KASSERT(pool->tp_overseer.tpt_job == NULL);
308 KASSERT(pool->tp_overseer.tpt_pool == pool);
309 KASSERT(pool->tp_flags == 0);
310 KASSERT(pool->tp_refcnt == 0);
311 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
312 KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
313 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
314 cv_destroy(&pool->tp_overseer.tpt_cv);
315 mutex_destroy(&pool->tp_lock);
316 return error;
317 }
318
319 /* Thread pool destruction */
320
321 static void
322 threadpool_destroy(struct threadpool *pool)
323 {
324 struct threadpool_thread *thread;
325
326 /* Mark the pool dying and wait for threads to commit suicide. */
327 mutex_spin_enter(&pool->tp_lock);
328 KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
329 pool->tp_flags |= THREADPOOL_DYING;
330 cv_broadcast(&pool->tp_overseer.tpt_cv);
331 TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry)
332 cv_broadcast(&thread->tpt_cv);
333 while (0 < pool->tp_refcnt) {
334 TP_LOG(("%s: draining %u references...\n", __func__,
335 pool->tp_refcnt));
336 cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock);
337 }
338 mutex_spin_exit(&pool->tp_lock);
339
340 KASSERT(pool->tp_overseer.tpt_job == NULL);
341 KASSERT(pool->tp_overseer.tpt_pool == pool);
342 KASSERT(pool->tp_flags == THREADPOOL_DYING);
343 KASSERT(pool->tp_refcnt == 0);
344 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
345 KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
346 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
347 cv_destroy(&pool->tp_overseer.tpt_cv);
348 mutex_destroy(&pool->tp_lock);
349 }
350
351 static int
352 threadpool_hold(struct threadpool *pool)
353 {
354 unsigned int refcnt;
355
356 do {
357 refcnt = pool->tp_refcnt;
358 if (refcnt == UINT_MAX)
359 return EBUSY;
360 } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt + 1))
361 != refcnt);
362
363 return 0;
364 }
365
366 static void
367 threadpool_rele(struct threadpool *pool)
368 {
369 unsigned int refcnt;
370
371 do {
372 refcnt = pool->tp_refcnt;
373 KASSERT(0 < refcnt);
374 if (refcnt == 1) {
375 mutex_spin_enter(&pool->tp_lock);
376 refcnt = atomic_dec_uint_nv(&pool->tp_refcnt);
377 KASSERT(refcnt != UINT_MAX);
378 if (refcnt == 0)
379 cv_broadcast(&pool->tp_overseer.tpt_cv);
380 mutex_spin_exit(&pool->tp_lock);
381 return;
382 }
383 } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt - 1))
384 != refcnt);
385 }
386
387 /* Unbound thread pools */
388
389 int
390 threadpool_get(struct threadpool **poolp, pri_t pri)
391 {
392 struct threadpool_unbound *tpu, *tmp = NULL;
393 int error;
394
395 THREADPOOL_INIT();
396
397 ASSERT_SLEEPABLE();
398
399 if (! threadpool_pri_is_valid(pri))
400 return EINVAL;
401
402 mutex_enter(&threadpools_lock);
403 tpu = threadpool_lookup_unbound(pri);
404 if (tpu == NULL) {
405 mutex_exit(&threadpools_lock);
406 TP_LOG(("%s: No pool for pri=%d, creating one.\n",
407 __func__, (int)pri));
408 tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP);
409 error = threadpool_create(&tmp->tpu_pool, NULL, pri);
410 if (error) {
411 kmem_free(tmp, sizeof(*tmp));
412 return error;
413 }
414 mutex_enter(&threadpools_lock);
415 tpu = threadpool_lookup_unbound(pri);
416 if (tpu == NULL) {
417 TP_LOG(("%s: Won the creation race for pri=%d.\n",
418 __func__, (int)pri));
419 tpu = tmp;
420 tmp = NULL;
421 threadpool_insert_unbound(tpu);
422 }
423 }
424 KASSERT(tpu != NULL);
425 tpu->tpu_refcnt++;
426 KASSERT(tpu->tpu_refcnt != 0);
427 mutex_exit(&threadpools_lock);
428
429 if (tmp != NULL) {
430 threadpool_destroy(&tmp->tpu_pool);
431 kmem_free(tmp, sizeof(*tmp));
432 }
433 KASSERT(tpu != NULL);
434 *poolp = &tpu->tpu_pool;
435 return 0;
436 }
437
438 void
439 threadpool_put(struct threadpool *pool, pri_t pri)
440 {
441 struct threadpool_unbound *tpu =
442 container_of(pool, struct threadpool_unbound, tpu_pool);
443
444 THREADPOOL_INIT();
445
446 ASSERT_SLEEPABLE();
447
448 KASSERT(threadpool_pri_is_valid(pri));
449
450 mutex_enter(&threadpools_lock);
451 KASSERT(tpu == threadpool_lookup_unbound(pri));
452 KASSERT(0 < tpu->tpu_refcnt);
453 if (--tpu->tpu_refcnt == 0) {
454 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
455 __func__, (int)pri));
456 threadpool_remove_unbound(tpu);
457 } else {
458 tpu = NULL;
459 }
460 mutex_exit(&threadpools_lock);
461
462 if (tpu) {
463 threadpool_destroy(&tpu->tpu_pool);
464 kmem_free(tpu, sizeof(*tpu));
465 }
466 }
467
468 /* Per-CPU thread pools */
469
470 int
471 threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri)
472 {
473 struct threadpool_percpu *pool_percpu, *tmp = NULL;
474 int error;
475
476 THREADPOOL_INIT();
477
478 ASSERT_SLEEPABLE();
479
480 if (! threadpool_pri_is_valid(pri))
481 return EINVAL;
482
483 mutex_enter(&threadpools_lock);
484 pool_percpu = threadpool_lookup_percpu(pri);
485 if (pool_percpu == NULL) {
486 mutex_exit(&threadpools_lock);
487 TP_LOG(("%s: No pool for pri=%d, creating one.\n",
488 __func__, (int)pri));
489 error = threadpool_percpu_create(&tmp, pri);
490 if (error)
491 return error;
492 KASSERT(tmp != NULL);
493 mutex_enter(&threadpools_lock);
494 pool_percpu = threadpool_lookup_percpu(pri);
495 if (pool_percpu == NULL) {
496 TP_LOG(("%s: Won the creation race for pri=%d.\n",
497 __func__, (int)pri));
498 pool_percpu = tmp;
499 tmp = NULL;
500 threadpool_insert_percpu(pool_percpu);
501 }
502 }
503 KASSERT(pool_percpu != NULL);
504 pool_percpu->tpp_refcnt++;
505 KASSERT(pool_percpu->tpp_refcnt != 0);
506 mutex_exit(&threadpools_lock);
507
508 if (tmp != NULL)
509 threadpool_percpu_destroy(tmp);
510 KASSERT(pool_percpu != NULL);
511 *pool_percpup = pool_percpu;
512 return 0;
513 }
514
515 void
516 threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri)
517 {
518
519 THREADPOOL_INIT();
520
521 ASSERT_SLEEPABLE();
522
523 KASSERT(threadpool_pri_is_valid(pri));
524
525 mutex_enter(&threadpools_lock);
526 KASSERT(pool_percpu == threadpool_lookup_percpu(pri));
527 KASSERT(0 < pool_percpu->tpp_refcnt);
528 if (--pool_percpu->tpp_refcnt == 0) {
529 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
530 __func__, (int)pri));
531 threadpool_remove_percpu(pool_percpu);
532 } else {
533 pool_percpu = NULL;
534 }
535 mutex_exit(&threadpools_lock);
536
537 if (pool_percpu)
538 threadpool_percpu_destroy(pool_percpu);
539 }
540
541 struct threadpool *
542 threadpool_percpu_ref(struct threadpool_percpu *pool_percpu)
543 {
544 struct threadpool **poolp, *pool;
545
546 poolp = percpu_getref(pool_percpu->tpp_percpu);
547 pool = *poolp;
548 percpu_putref(pool_percpu->tpp_percpu);
549
550 return pool;
551 }
552
553 struct threadpool *
554 threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu,
555 struct cpu_info *ci)
556 {
557 struct threadpool **poolp, *pool;
558
559 percpu_traverse_enter();
560 poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
561 pool = *poolp;
562 percpu_traverse_exit();
563
564 return pool;
565 }
566
567 static int
568 threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri)
569 {
570 struct threadpool_percpu *pool_percpu;
571 struct cpu_info *ci;
572 CPU_INFO_ITERATOR cii;
573 unsigned int i, j;
574 int error;
575
576 pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP);
577 if (pool_percpu == NULL) {
578 error = ENOMEM;
579 goto fail0;
580 }
581 pool_percpu->tpp_pri = pri;
582
583 pool_percpu->tpp_percpu = percpu_alloc(sizeof(struct threadpool *));
584 if (pool_percpu->tpp_percpu == NULL) {
585 error = ENOMEM;
586 goto fail1;
587 }
588
589 for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) {
590 struct threadpool *pool;
591
592 pool = kmem_zalloc(sizeof(*pool), KM_SLEEP);
593 error = threadpool_create(pool, ci, pri);
594 if (error) {
595 kmem_free(pool, sizeof(*pool));
596 goto fail2;
597 }
598 percpu_traverse_enter();
599 struct threadpool **const poolp =
600 percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
601 *poolp = pool;
602 percpu_traverse_exit();
603 }
604
605 /* Success! */
606 *pool_percpup = (struct threadpool_percpu *)pool_percpu;
607 return 0;
608
609 fail2: for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) {
610 if (i <= j)
611 break;
612 percpu_traverse_enter();
613 struct threadpool **const poolp =
614 percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
615 struct threadpool *const pool = *poolp;
616 percpu_traverse_exit();
617 threadpool_destroy(pool);
618 kmem_free(pool, sizeof(*pool));
619 }
620 percpu_free(pool_percpu->tpp_percpu, sizeof(struct taskthread_pool *));
621 fail1: kmem_free(pool_percpu, sizeof(*pool_percpu));
622 fail0: return error;
623 }
624
625 static void
626 threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu)
627 {
628 struct cpu_info *ci;
629 CPU_INFO_ITERATOR cii;
630
631 for (CPU_INFO_FOREACH(cii, ci)) {
632 percpu_traverse_enter();
633 struct threadpool **const poolp =
634 percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
635 struct threadpool *const pool = *poolp;
636 percpu_traverse_exit();
637 threadpool_destroy(pool);
638 kmem_free(pool, sizeof(*pool));
639 }
640
641 percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
642 kmem_free(pool_percpu, sizeof(*pool_percpu));
643 }
644
645 /* Thread pool jobs */
646
647 void __printflike(4,5)
648 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn,
649 kmutex_t *lock, const char *fmt, ...)
650 {
651 va_list ap;
652
653 va_start(ap, fmt);
654 (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap);
655 va_end(ap);
656
657 job->job_lock = lock;
658 job->job_thread = NULL;
659 job->job_refcnt = 0;
660 cv_init(&job->job_cv, job->job_name);
661 job->job_fn = fn;
662 }
663
664 static void
665 threadpool_job_dead(struct threadpool_job *job)
666 {
667
668 panic("threadpool job %p ran after destruction", job);
669 }
670
671 void
672 threadpool_job_destroy(struct threadpool_job *job)
673 {
674
675 ASSERT_SLEEPABLE();
676
677 KASSERTMSG((job->job_thread == NULL), "job %p still running", job);
678
679 mutex_enter(job->job_lock);
680 while (0 < job->job_refcnt)
681 cv_wait(&job->job_cv, job->job_lock);
682 mutex_exit(job->job_lock);
683
684 job->job_lock = NULL;
685 KASSERT(job->job_thread == NULL);
686 KASSERT(job->job_refcnt == 0);
687 KASSERT(!cv_has_waiters(&job->job_cv));
688 cv_destroy(&job->job_cv);
689 job->job_fn = threadpool_job_dead;
690 (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
691 }
692
693 static int
694 threadpool_job_hold(struct threadpool_job *job)
695 {
696 unsigned int refcnt;
697 do {
698 refcnt = job->job_refcnt;
699 if (refcnt == UINT_MAX)
700 return EBUSY;
701 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1))
702 != refcnt);
703
704 return 0;
705 }
706
707 static void
708 threadpool_job_rele(struct threadpool_job *job)
709 {
710 unsigned int refcnt;
711
712 do {
713 refcnt = job->job_refcnt;
714 KASSERT(0 < refcnt);
715 if (refcnt == 1) {
716 mutex_enter(job->job_lock);
717 refcnt = atomic_dec_uint_nv(&job->job_refcnt);
718 KASSERT(refcnt != UINT_MAX);
719 if (refcnt == 0)
720 cv_broadcast(&job->job_cv);
721 mutex_exit(job->job_lock);
722 return;
723 }
724 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt - 1))
725 != refcnt);
726 }
727
728 void
729 threadpool_job_done(struct threadpool_job *job)
730 {
731
732 KASSERT(mutex_owned(job->job_lock));
733 KASSERT(job->job_thread != NULL);
734 KASSERT(job->job_thread->tpt_lwp == curlwp);
735
736 cv_broadcast(&job->job_cv);
737 job->job_thread = NULL;
738 }
739
740 void
741 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
742 {
743
744 KASSERT(mutex_owned(job->job_lock));
745
746 /*
747 * If the job's already running, let it keep running. The job
748 * is guaranteed by the interlock not to end early -- if it had
749 * ended early, threadpool_job_done would have set job_thread
750 * to NULL under the interlock.
751 */
752 if (__predict_true(job->job_thread != NULL)) {
753 TP_LOG(("%s: job '%s' already runnining.\n",
754 __func__, job->job_name));
755 return;
756 }
757
758 /* Otherwise, try to assign a thread to the job. */
759 mutex_spin_enter(&pool->tp_lock);
760 if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
761 /* Nobody's idle. Give it to the overseer. */
762 TP_LOG(("%s: giving job '%s' to overseer.\n",
763 __func__, job->job_name));
764 job->job_thread = &pool->tp_overseer;
765 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
766 } else {
767 /* Assign it to the first idle thread. */
768 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
769 TP_LOG(("%s: giving job '%s' to idle thread %p.\n",
770 __func__, job->job_name, job->job_thread));
771 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
772 tpt_entry);
773 threadpool_job_hold(job);
774 job->job_thread->tpt_job = job;
775 }
776
777 /* Notify whomever we gave it to, overseer or idle thread. */
778 KASSERT(job->job_thread != NULL);
779 cv_broadcast(&job->job_thread->tpt_cv);
780 mutex_spin_exit(&pool->tp_lock);
781 }
782
783 bool
784 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
785 {
786
787 KASSERT(mutex_owned(job->job_lock));
788
789 /*
790 * XXXJRT This fails (albeit safely) when all of the following
791 * are true:
792 *
793 * => "pool" is something other than what the job was
794 * scheduled on. This can legitimately occur if,
795 * for example, a job is percpu-scheduled on CPU0
796 * and then CPU1 attempts to cancel it without taking
797 * a remote pool reference. (this might happen by
798 * "luck of the draw").
799 *
800 * => "job" is not yet running, but is assigned to the
801 * overseer.
802 *
803 * When this happens, this code makes the determination that
804 * the job is already running. The failure mode is that the
805 * caller is told the job is running, and thus has to wait.
806 * The overseer will eventually get to it and the job will
807 * proceed as if it had been already running.
808 */
809
810 if (job->job_thread == NULL) {
811 /* Nothing to do. Guaranteed not running. */
812 return true;
813 } else if (job->job_thread == &pool->tp_overseer) {
814 /* Take it off the list to guarantee it won't run. */
815 job->job_thread = NULL;
816 mutex_spin_enter(&pool->tp_lock);
817 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
818 mutex_spin_exit(&pool->tp_lock);
819 return true;
820 } else {
821 /* Too late -- already running. */
822 return false;
823 }
824 }
825
826 void
827 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
828 {
829
830 ASSERT_SLEEPABLE();
831
832 KASSERT(mutex_owned(job->job_lock));
833
834 if (threadpool_cancel_job_async(pool, job))
835 return;
836
837 /* Already running. Wait for it to complete. */
838 while (job->job_thread != NULL)
839 cv_wait(&job->job_cv, job->job_lock);
840 }
841
842 /* Thread pool overseer thread */
843
844 static void __dead
845 threadpool_overseer_thread(void *arg)
846 {
847 struct threadpool_thread *const overseer = arg;
848 struct threadpool *const pool = overseer->tpt_pool;
849 struct lwp *lwp = NULL;
850 int ktflags;
851 int error;
852
853 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
854
855 /* Wait until we're initialized. */
856 mutex_spin_enter(&pool->tp_lock);
857 while (overseer->tpt_lwp == NULL)
858 cv_wait(&overseer->tpt_cv, &pool->tp_lock);
859
860 TP_LOG(("%s: starting.\n", __func__));
861
862 for (;;) {
863 /* Wait until there's a job. */
864 while (TAILQ_EMPTY(&pool->tp_jobs)) {
865 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
866 TP_LOG(("%s: THREADPOOL_DYING\n",
867 __func__));
868 break;
869 }
870 cv_wait(&overseer->tpt_cv, &pool->tp_lock);
871 }
872 if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
873 break;
874
875 /* If there are no threads, we'll have to try to start one. */
876 if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
877 TP_LOG(("%s: Got a job, need to create a thread.\n",
878 __func__));
879 error = threadpool_hold(pool);
880 if (error) {
881 (void)kpause("thrdplrf", false, hz,
882 &pool->tp_lock);
883 continue;
884 }
885 mutex_spin_exit(&pool->tp_lock);
886
887 struct threadpool_thread *const thread =
888 pool_cache_get(threadpool_thread_pc, PR_WAITOK);
889 thread->tpt_lwp = NULL;
890 thread->tpt_pool = pool;
891 thread->tpt_job = NULL;
892 cv_init(&thread->tpt_cv, "poolthrd");
893
894 ktflags = 0;
895 ktflags |= KTHREAD_MPSAFE;
896 if (pool->tp_pri < PRI_KERNEL)
897 ktflags |= KTHREAD_TS;
898 error = kthread_create(pool->tp_pri, ktflags,
899 pool->tp_cpu, &threadpool_thread, thread, &lwp,
900 "poolthread/%d@%d",
901 (pool->tp_cpu ? cpu_index(pool->tp_cpu) : -1),
902 (int)pool->tp_pri);
903
904 mutex_spin_enter(&pool->tp_lock);
905 if (error) {
906 pool_cache_put(threadpool_thread_pc, thread);
907 threadpool_rele(pool);
908 /* XXX What to do to wait for memory? */
909 (void)kpause("thrdplcr", false, hz,
910 &pool->tp_lock);
911 continue;
912 }
913 KASSERT(lwp != NULL);
914 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread,
915 tpt_entry);
916 thread->tpt_lwp = lwp;
917 lwp = NULL;
918 cv_broadcast(&thread->tpt_cv);
919 continue;
920 }
921
922 /* There are idle threads, so try giving one a job. */
923 bool rele_job = true;
924 struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs);
925 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
926 error = threadpool_job_hold(job);
927 if (error) {
928 TAILQ_INSERT_HEAD(&pool->tp_jobs, job, job_entry);
929 (void)kpause("pooljob", false, hz, &pool->tp_lock);
930 continue;
931 }
932 mutex_spin_exit(&pool->tp_lock);
933
934 mutex_enter(job->job_lock);
935 /* If the job was cancelled, we'll no longer be its thread. */
936 if (__predict_true(job->job_thread == overseer)) {
937 mutex_spin_enter(&pool->tp_lock);
938 if (__predict_false(
939 TAILQ_EMPTY(&pool->tp_idle_threads))) {
940 /*
941 * Someone else snagged the thread
942 * first. We'll have to try again.
943 */
944 TP_LOG(("%s: '%s' lost race to use idle thread.\n",
945 __func__, job->job_name));
946 TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
947 job_entry);
948 } else {
949 /*
950 * Assign the job to the thread and
951 * wake the thread so it starts work.
952 */
953 struct threadpool_thread *const thread =
954 TAILQ_FIRST(&pool->tp_idle_threads);
955
956 TP_LOG(("%s: '%s' gets thread %p\n",
957 __func__, job->job_name, thread));
958 KASSERT(thread->tpt_job == NULL);
959 TAILQ_REMOVE(&pool->tp_idle_threads, thread,
960 tpt_entry);
961 thread->tpt_job = job;
962 job->job_thread = thread;
963 cv_broadcast(&thread->tpt_cv);
964 /* Gave the thread our job reference. */
965 rele_job = false;
966 }
967 mutex_spin_exit(&pool->tp_lock);
968 }
969 mutex_exit(job->job_lock);
970 if (__predict_false(rele_job))
971 threadpool_job_rele(job);
972
973 mutex_spin_enter(&pool->tp_lock);
974 }
975 mutex_spin_exit(&pool->tp_lock);
976
977 TP_LOG(("%s: exiting.\n", __func__));
978
979 threadpool_rele(pool);
980 kthread_exit(0);
981 }
982
983 /* Thread pool thread */
984
985 static void __dead
986 threadpool_thread(void *arg)
987 {
988 struct threadpool_thread *const thread = arg;
989 struct threadpool *const pool = thread->tpt_pool;
990
991 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
992
993 /* Wait until we're initialized and on the queue. */
994 mutex_spin_enter(&pool->tp_lock);
995 while (thread->tpt_lwp == NULL)
996 cv_wait(&thread->tpt_cv, &pool->tp_lock);
997
998 TP_LOG(("%s: starting.\n", __func__));
999
1000 KASSERT(thread->tpt_lwp == curlwp);
1001 for (;;) {
1002 /* Wait until we are assigned a job. */
1003 while (thread->tpt_job == NULL) {
1004 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
1005 TP_LOG(("%s: THREADPOOL_DYING\n",
1006 __func__));
1007 break;
1008 }
1009 if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
1010 THREADPOOL_IDLE_TICKS))
1011 break;
1012 }
1013 if (__predict_false(thread->tpt_job == NULL)) {
1014 TAILQ_REMOVE(&pool->tp_idle_threads, thread,
1015 tpt_entry);
1016 break;
1017 }
1018
1019 struct threadpool_job *const job = thread->tpt_job;
1020 KASSERT(job != NULL);
1021 mutex_spin_exit(&pool->tp_lock);
1022
1023 TP_LOG(("%s: running job '%s' on thread %p.\n",
1024 __func__, job->job_name, thread));
1025
1026 /* Set our lwp name to reflect what job we're doing. */
1027 lwp_lock(curlwp);
1028 char *const lwp_name = curlwp->l_name;
1029 curlwp->l_name = job->job_name;
1030 lwp_unlock(curlwp);
1031
1032 /* Run the job. */
1033 (*job->job_fn)(job);
1034
1035 /* Restore our lwp name. */
1036 lwp_lock(curlwp);
1037 curlwp->l_name = lwp_name;
1038 lwp_unlock(curlwp);
1039
1040 /* Job is done and its name is unreferenced. Release it. */
1041 threadpool_job_rele(job);
1042
1043 mutex_spin_enter(&pool->tp_lock);
1044 KASSERT(thread->tpt_job == job);
1045 thread->tpt_job = NULL;
1046 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry);
1047 }
1048 mutex_spin_exit(&pool->tp_lock);
1049
1050 TP_LOG(("%s: thread %p exiting.\n", __func__, thread));
1051
1052 KASSERT(!cv_has_waiters(&thread->tpt_cv));
1053 cv_destroy(&thread->tpt_cv);
1054 pool_cache_put(threadpool_thread_pc, thread);
1055 threadpool_rele(pool);
1056 kthread_exit(0);
1057 }
1058