1 1.23 riastrad /* $NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $ */ 2 1.1 thorpej 3 1.1 thorpej /*- 4 1.1 thorpej * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc. 5 1.1 thorpej * All rights reserved. 6 1.1 thorpej * 7 1.1 thorpej * This code is derived from software contributed to The NetBSD Foundation 8 1.1 thorpej * by Taylor R. Campbell and Jason R. Thorpe. 9 1.1 thorpej * 10 1.1 thorpej * Redistribution and use in source and binary forms, with or without 11 1.1 thorpej * modification, are permitted provided that the following conditions 12 1.1 thorpej * are met: 13 1.1 thorpej * 1. Redistributions of source code must retain the above copyright 14 1.1 thorpej * notice, this list of conditions and the following disclaimer. 15 1.1 thorpej * 2. Redistributions in binary form must reproduce the above copyright 16 1.1 thorpej * notice, this list of conditions and the following disclaimer in the 17 1.1 thorpej * documentation and/or other materials provided with the distribution. 18 1.1 thorpej * 19 1.1 thorpej * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 1.1 thorpej * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 1.1 thorpej * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 1.1 thorpej * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 1.1 thorpej * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 1.1 thorpej * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 1.1 thorpej * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 1.1 thorpej * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 1.1 thorpej * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 1.1 thorpej * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 1.1 thorpej * POSSIBILITY OF SUCH DAMAGE. 30 1.1 thorpej */ 31 1.1 thorpej 32 1.1 thorpej /* 33 1.1 thorpej * Thread pools. 34 1.1 thorpej * 35 1.1 thorpej * A thread pool is a collection of worker threads idle or running 36 1.22 skrll * jobs, together with a dispatcher thread that does not run jobs but 37 1.1 thorpej * can be given jobs to assign to a worker thread. Scheduling a job in 38 1.1 thorpej * a thread pool does not allocate or even sleep at all, except perhaps 39 1.1 thorpej * on an adaptive lock, unlike kthread_create. Jobs reuse threads, so 40 1.1 thorpej * they do not incur the expense of creating and destroying kthreads 41 1.1 thorpej * unless there is not much work to be done. 42 1.1 thorpej * 43 1.1 thorpej * A per-CPU thread pool (threadpool_percpu) is a collection of thread 44 1.1 thorpej * pools, one per CPU bound to that CPU. For each priority level in 45 1.1 thorpej * use, there is one shared unbound thread pool (i.e., pool of threads 46 1.1 thorpej * not bound to any CPU) and one shared per-CPU thread pool. 47 1.1 thorpej * 48 1.1 thorpej * To use the unbound thread pool at priority pri, call 49 1.1 thorpej * threadpool_get(&pool, pri). When you're done, call 50 1.1 thorpej * threadpool_put(pool, pri). 51 1.1 thorpej * 52 1.1 thorpej * To use the per-CPU thread pools at priority pri, call 53 1.1 thorpej * threadpool_percpu_get(&pool_percpu, pri), and then use the thread 54 1.1 thorpej * pool returned by threadpool_percpu_ref(pool_percpu) for the current 55 1.1 thorpej * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another 56 1.1 thorpej * CPU. When you're done, call threadpool_percpu_put(pool_percpu, 57 1.1 thorpej * pri). 58 1.1 thorpej * 59 1.21 riastrad * +--MACHINE-----------------------------------------------------+ 60 1.21 riastrad * | +--CPU 0---------+ +--CPU 1---------+ +--CPU n---------+ | 61 1.21 riastrad * | | <dispatcher 0> | | <dispatcher 1> | ... | <dispatcher n> | | 62 1.21 riastrad * | | <idle 0a> | | <running 1a> | ... | <idle na> | | 63 1.21 riastrad * | | <running 0b> | | <running 1b> | ... | <idle nb> | | 64 1.21 riastrad * | | . | | . | ... | . | | 65 1.21 riastrad * | | . | | . | ... | . | | 66 1.21 riastrad * | | . | | . | ... | . | | 67 1.21 riastrad * | +----------------+ +----------------+ +----------------+ | 68 1.21 riastrad * | +--unbound-----------+ | 69 1.21 riastrad * | | <dispatcher n+1> | | 70 1.21 riastrad * | | <idle (n+1)a> | | 71 1.21 riastrad * | | <running (n+1)b> | | 72 1.21 riastrad * | +--------------------+ | 73 1.21 riastrad * +--------------------------------------------------------------+ 74 1.1 thorpej * 75 1.21 riastrad * XXX Why one dispatcher per CPU? I did that originally to avoid 76 1.1 thorpej * touching remote CPUs' memory when scheduling a job, but that still 77 1.1 thorpej * requires interprocessor synchronization. Perhaps we could get by 78 1.21 riastrad * with a single dispatcher thread, at the expense of another pointer 79 1.21 riastrad * in struct threadpool_job to identify the CPU on which it must run in 80 1.21 riastrad * order for the dispatcher to schedule it correctly. 81 1.1 thorpej */ 82 1.1 thorpej 83 1.1 thorpej #include <sys/cdefs.h> 84 1.23 riastrad __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $"); 85 1.1 thorpej 86 1.1 thorpej #include <sys/types.h> 87 1.1 thorpej #include <sys/param.h> 88 1.1 thorpej #include <sys/atomic.h> 89 1.1 thorpej #include <sys/condvar.h> 90 1.1 thorpej #include <sys/cpu.h> 91 1.1 thorpej #include <sys/kernel.h> 92 1.1 thorpej #include <sys/kmem.h> 93 1.1 thorpej #include <sys/kthread.h> 94 1.1 thorpej #include <sys/mutex.h> 95 1.1 thorpej #include <sys/once.h> 96 1.1 thorpej #include <sys/percpu.h> 97 1.1 thorpej #include <sys/pool.h> 98 1.1 thorpej #include <sys/proc.h> 99 1.1 thorpej #include <sys/queue.h> 100 1.17 riastrad #include <sys/sdt.h> 101 1.17 riastrad #include <sys/sysctl.h> 102 1.1 thorpej #include <sys/systm.h> 103 1.1 thorpej #include <sys/threadpool.h> 104 1.1 thorpej 105 1.17 riastrad /* Probes */ 106 1.17 riastrad 107 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get, 108 1.17 riastrad "pri_t"/*pri*/); 109 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__create, 110 1.17 riastrad "pri_t"/*pri*/); 111 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__race, 112 1.17 riastrad "pri_t"/*pri*/); 113 1.17 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put, 114 1.17 riastrad "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 115 1.17 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put__destroy, 116 1.17 riastrad "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 117 1.17 riastrad 118 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get, 119 1.17 riastrad "pri_t"/*pri*/); 120 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__create, 121 1.17 riastrad "pri_t"/*pri*/); 122 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__race, 123 1.17 riastrad "pri_t"/*pri*/); 124 1.17 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put, 125 1.17 riastrad "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 126 1.17 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put__destroy, 127 1.17 riastrad "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 128 1.17 riastrad 129 1.17 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, create, 130 1.17 riastrad "struct cpu_info *"/*ci*/, "pri_t"/*pri*/); 131 1.17 riastrad SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__success, 132 1.17 riastrad "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "struct threadpool *"/*pool*/); 133 1.17 riastrad SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__failure, 134 1.17 riastrad "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "int"/*error*/); 135 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, destroy, 136 1.17 riastrad "struct threadpool *"/*pool*/); 137 1.17 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, destroy__wait, 138 1.17 riastrad "struct threadpool *"/*pool*/, "uint64_t"/*refcnt*/); 139 1.17 riastrad 140 1.17 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job, 141 1.17 riastrad "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 142 1.17 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__running, 143 1.17 riastrad "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 144 1.21 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__dispatcher, 145 1.17 riastrad "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 146 1.17 riastrad SDT_PROBE_DEFINE3(sdt, kernel, threadpool, schedule__job__thread, 147 1.17 riastrad "struct threadpool *"/*pool*/, 148 1.17 riastrad "struct threadpool_job *"/*job*/, 149 1.17 riastrad "struct lwp *"/*thread*/); 150 1.17 riastrad 151 1.21 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__start, 152 1.17 riastrad "struct threadpool *"/*pool*/); 153 1.21 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__dying, 154 1.17 riastrad "struct threadpool *"/*pool*/); 155 1.21 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__spawn, 156 1.17 riastrad "struct threadpool *"/*pool*/); 157 1.21 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, dispatcher__race, 158 1.17 riastrad "struct threadpool *"/*pool*/, 159 1.17 riastrad "struct threadpool_job *"/*job*/); 160 1.21 riastrad SDT_PROBE_DEFINE3(sdt, kernel, threadpool, dispatcher__assign, 161 1.17 riastrad "struct threadpool *"/*pool*/, 162 1.17 riastrad "struct threadpool_job *"/*job*/, 163 1.17 riastrad "struct lwp *"/*thread*/); 164 1.21 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__exit, 165 1.17 riastrad "struct threadpool *"/*pool*/); 166 1.17 riastrad 167 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__start, 168 1.17 riastrad "struct threadpool *"/*pool*/); 169 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__dying, 170 1.17 riastrad "struct threadpool *"/*pool*/); 171 1.17 riastrad SDT_PROBE_DEFINE2(sdt, kernel, threadpool, thread__job, 172 1.17 riastrad "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 173 1.17 riastrad SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__exit, 174 1.17 riastrad "struct threadpool *"/*pool*/); 175 1.17 riastrad 176 1.1 thorpej /* Data structures */ 177 1.1 thorpej 178 1.4 thorpej TAILQ_HEAD(job_head, threadpool_job); 179 1.1 thorpej TAILQ_HEAD(thread_head, threadpool_thread); 180 1.1 thorpej 181 1.1 thorpej struct threadpool_thread { 182 1.1 thorpej struct lwp *tpt_lwp; 183 1.12 thorpej char *tpt_lwp_savedname; 184 1.4 thorpej struct threadpool *tpt_pool; 185 1.4 thorpej struct threadpool_job *tpt_job; 186 1.1 thorpej kcondvar_t tpt_cv; 187 1.1 thorpej TAILQ_ENTRY(threadpool_thread) tpt_entry; 188 1.1 thorpej }; 189 1.1 thorpej 190 1.1 thorpej struct threadpool { 191 1.1 thorpej kmutex_t tp_lock; 192 1.21 riastrad struct threadpool_thread tp_dispatcher; 193 1.1 thorpej struct job_head tp_jobs; 194 1.1 thorpej struct thread_head tp_idle_threads; 195 1.7 thorpej uint64_t tp_refcnt; 196 1.1 thorpej int tp_flags; 197 1.1 thorpej #define THREADPOOL_DYING 0x01 198 1.1 thorpej struct cpu_info *tp_cpu; 199 1.1 thorpej pri_t tp_pri; 200 1.1 thorpej }; 201 1.1 thorpej 202 1.7 thorpej static void threadpool_hold(struct threadpool *); 203 1.4 thorpej static void threadpool_rele(struct threadpool *); 204 1.1 thorpej 205 1.4 thorpej static int threadpool_percpu_create(struct threadpool_percpu **, pri_t); 206 1.4 thorpej static void threadpool_percpu_destroy(struct threadpool_percpu *); 207 1.16 riastrad static void threadpool_percpu_init(void *, void *, struct cpu_info *); 208 1.16 riastrad static void threadpool_percpu_ok(void *, void *, struct cpu_info *); 209 1.16 riastrad static void threadpool_percpu_fini(void *, void *, struct cpu_info *); 210 1.1 thorpej 211 1.10 thorpej static threadpool_job_fn_t threadpool_job_dead; 212 1.1 thorpej 213 1.13 thorpej static void threadpool_job_hold(struct threadpool_job *); 214 1.4 thorpej static void threadpool_job_rele(struct threadpool_job *); 215 1.1 thorpej 216 1.21 riastrad static void threadpool_dispatcher_thread(void *) __dead; 217 1.1 thorpej static void threadpool_thread(void *) __dead; 218 1.1 thorpej 219 1.1 thorpej static pool_cache_t threadpool_thread_pc __read_mostly; 220 1.1 thorpej 221 1.1 thorpej static kmutex_t threadpools_lock __cacheline_aligned; 222 1.1 thorpej 223 1.14 thorpej /* Default to 30 second idle timeout for pool threads. */ 224 1.14 thorpej static int threadpool_idle_time_ms = 30 * 1000; 225 1.1 thorpej 226 1.1 thorpej struct threadpool_unbound { 227 1.1 thorpej struct threadpool tpu_pool; 228 1.1 thorpej 229 1.1 thorpej /* protected by threadpools_lock */ 230 1.1 thorpej LIST_ENTRY(threadpool_unbound) tpu_link; 231 1.5 thorpej uint64_t tpu_refcnt; 232 1.1 thorpej }; 233 1.1 thorpej 234 1.1 thorpej static LIST_HEAD(, threadpool_unbound) unbound_threadpools; 235 1.1 thorpej 236 1.1 thorpej static struct threadpool_unbound * 237 1.1 thorpej threadpool_lookup_unbound(pri_t pri) 238 1.1 thorpej { 239 1.1 thorpej struct threadpool_unbound *tpu; 240 1.1 thorpej 241 1.1 thorpej LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) { 242 1.1 thorpej if (tpu->tpu_pool.tp_pri == pri) 243 1.1 thorpej return tpu; 244 1.1 thorpej } 245 1.1 thorpej return NULL; 246 1.1 thorpej } 247 1.1 thorpej 248 1.1 thorpej static void 249 1.1 thorpej threadpool_insert_unbound(struct threadpool_unbound *tpu) 250 1.1 thorpej { 251 1.1 thorpej KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL); 252 1.1 thorpej LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link); 253 1.1 thorpej } 254 1.1 thorpej 255 1.1 thorpej static void 256 1.1 thorpej threadpool_remove_unbound(struct threadpool_unbound *tpu) 257 1.1 thorpej { 258 1.1 thorpej KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu); 259 1.1 thorpej LIST_REMOVE(tpu, tpu_link); 260 1.1 thorpej } 261 1.1 thorpej 262 1.1 thorpej struct threadpool_percpu { 263 1.1 thorpej percpu_t * tpp_percpu; 264 1.1 thorpej pri_t tpp_pri; 265 1.1 thorpej 266 1.1 thorpej /* protected by threadpools_lock */ 267 1.1 thorpej LIST_ENTRY(threadpool_percpu) tpp_link; 268 1.5 thorpej uint64_t tpp_refcnt; 269 1.1 thorpej }; 270 1.1 thorpej 271 1.1 thorpej static LIST_HEAD(, threadpool_percpu) percpu_threadpools; 272 1.1 thorpej 273 1.4 thorpej static struct threadpool_percpu * 274 1.1 thorpej threadpool_lookup_percpu(pri_t pri) 275 1.1 thorpej { 276 1.4 thorpej struct threadpool_percpu *tpp; 277 1.1 thorpej 278 1.1 thorpej LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) { 279 1.1 thorpej if (tpp->tpp_pri == pri) 280 1.1 thorpej return tpp; 281 1.1 thorpej } 282 1.1 thorpej return NULL; 283 1.1 thorpej } 284 1.1 thorpej 285 1.1 thorpej static void 286 1.4 thorpej threadpool_insert_percpu(struct threadpool_percpu *tpp) 287 1.1 thorpej { 288 1.1 thorpej KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL); 289 1.1 thorpej LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link); 290 1.1 thorpej } 291 1.1 thorpej 292 1.1 thorpej static void 293 1.4 thorpej threadpool_remove_percpu(struct threadpool_percpu *tpp) 294 1.1 thorpej { 295 1.1 thorpej KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp); 296 1.1 thorpej LIST_REMOVE(tpp, tpp_link); 297 1.1 thorpej } 298 1.1 thorpej 299 1.14 thorpej static int 300 1.14 thorpej sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS) 301 1.14 thorpej { 302 1.14 thorpej struct sysctlnode node; 303 1.14 thorpej int val, error; 304 1.14 thorpej 305 1.14 thorpej node = *rnode; 306 1.14 thorpej 307 1.14 thorpej val = threadpool_idle_time_ms; 308 1.14 thorpej node.sysctl_data = &val; 309 1.14 thorpej error = sysctl_lookup(SYSCTLFN_CALL(&node)); 310 1.14 thorpej if (error == 0 && newp != NULL) { 311 1.14 thorpej /* Disallow negative values and 0 (forever). */ 312 1.14 thorpej if (val < 1) 313 1.14 thorpej error = EINVAL; 314 1.14 thorpej else 315 1.14 thorpej threadpool_idle_time_ms = val; 316 1.14 thorpej } 317 1.14 thorpej 318 1.14 thorpej return error; 319 1.14 thorpej } 320 1.14 thorpej 321 1.14 thorpej SYSCTL_SETUP_PROTO(sysctl_threadpool_setup); 322 1.14 thorpej 323 1.14 thorpej SYSCTL_SETUP(sysctl_threadpool_setup, 324 1.14 thorpej "sysctl kern.threadpool subtree setup") 325 1.14 thorpej { 326 1.14 thorpej const struct sysctlnode *rnode, *cnode; 327 1.14 thorpej int error __diagused; 328 1.14 thorpej 329 1.14 thorpej error = sysctl_createv(clog, 0, NULL, &rnode, 330 1.14 thorpej CTLFLAG_PERMANENT, 331 1.14 thorpej CTLTYPE_NODE, "threadpool", 332 1.14 thorpej SYSCTL_DESCR("threadpool subsystem options"), 333 1.14 thorpej NULL, 0, NULL, 0, 334 1.14 thorpej CTL_KERN, CTL_CREATE, CTL_EOL); 335 1.14 thorpej KASSERT(error == 0); 336 1.14 thorpej 337 1.14 thorpej error = sysctl_createv(clog, 0, &rnode, &cnode, 338 1.14 thorpej CTLFLAG_PERMANENT | CTLFLAG_READWRITE, 339 1.14 thorpej CTLTYPE_INT, "idle_ms", 340 1.14 thorpej SYSCTL_DESCR("idle thread timeout in ms"), 341 1.14 thorpej sysctl_kern_threadpool_idle_ms, 0, NULL, 0, 342 1.14 thorpej CTL_CREATE, CTL_EOL); 343 1.14 thorpej KASSERT(error == 0); 344 1.14 thorpej } 345 1.14 thorpej 346 1.11 thorpej void 347 1.1 thorpej threadpools_init(void) 348 1.1 thorpej { 349 1.1 thorpej 350 1.1 thorpej threadpool_thread_pc = 351 1.1 thorpej pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0, 352 1.1 thorpej "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL); 353 1.1 thorpej 354 1.1 thorpej LIST_INIT(&unbound_threadpools); 355 1.1 thorpej LIST_INIT(&percpu_threadpools); 356 1.1 thorpej mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE); 357 1.1 thorpej } 358 1.1 thorpej 359 1.21 riastrad static void 360 1.21 riastrad threadnamesuffix(char *buf, size_t buflen, struct cpu_info *ci, int pri) 361 1.21 riastrad { 362 1.21 riastrad 363 1.21 riastrad buf[0] = '\0'; 364 1.21 riastrad if (ci) 365 1.21 riastrad snprintf(buf + strlen(buf), buflen - strlen(buf), "/%d", 366 1.21 riastrad cpu_index(ci)); 367 1.21 riastrad if (pri != PRI_NONE) 368 1.21 riastrad snprintf(buf + strlen(buf), buflen - strlen(buf), "@%d", pri); 369 1.21 riastrad } 370 1.21 riastrad 371 1.1 thorpej /* Thread pool creation */ 372 1.1 thorpej 373 1.1 thorpej static bool 374 1.1 thorpej threadpool_pri_is_valid(pri_t pri) 375 1.1 thorpej { 376 1.1 thorpej return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT)); 377 1.1 thorpej } 378 1.1 thorpej 379 1.1 thorpej static int 380 1.6 thorpej threadpool_create(struct threadpool *const pool, struct cpu_info *ci, 381 1.6 thorpej pri_t pri) 382 1.1 thorpej { 383 1.1 thorpej struct lwp *lwp; 384 1.21 riastrad char suffix[16]; 385 1.1 thorpej int ktflags; 386 1.1 thorpej int error; 387 1.1 thorpej 388 1.1 thorpej KASSERT(threadpool_pri_is_valid(pri)); 389 1.1 thorpej 390 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, create, ci, pri); 391 1.17 riastrad 392 1.1 thorpej mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM); 393 1.21 riastrad /* XXX dispatcher */ 394 1.1 thorpej TAILQ_INIT(&pool->tp_jobs); 395 1.1 thorpej TAILQ_INIT(&pool->tp_idle_threads); 396 1.21 riastrad pool->tp_refcnt = 1; /* dispatcher's reference */ 397 1.1 thorpej pool->tp_flags = 0; 398 1.1 thorpej pool->tp_cpu = ci; 399 1.1 thorpej pool->tp_pri = pri; 400 1.1 thorpej 401 1.21 riastrad pool->tp_dispatcher.tpt_lwp = NULL; 402 1.21 riastrad pool->tp_dispatcher.tpt_pool = pool; 403 1.21 riastrad pool->tp_dispatcher.tpt_job = NULL; 404 1.21 riastrad cv_init(&pool->tp_dispatcher.tpt_cv, "pooldisp"); 405 1.1 thorpej 406 1.1 thorpej ktflags = 0; 407 1.1 thorpej ktflags |= KTHREAD_MPSAFE; 408 1.1 thorpej if (pri < PRI_KERNEL) 409 1.1 thorpej ktflags |= KTHREAD_TS; 410 1.21 riastrad threadnamesuffix(suffix, sizeof(suffix), ci, pri); 411 1.21 riastrad error = kthread_create(pri, ktflags, ci, &threadpool_dispatcher_thread, 412 1.21 riastrad &pool->tp_dispatcher, &lwp, "pooldisp%s", suffix); 413 1.1 thorpej if (error) 414 1.1 thorpej goto fail0; 415 1.1 thorpej 416 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 417 1.21 riastrad pool->tp_dispatcher.tpt_lwp = lwp; 418 1.21 riastrad cv_broadcast(&pool->tp_dispatcher.tpt_cv); 419 1.1 thorpej mutex_spin_exit(&pool->tp_lock); 420 1.1 thorpej 421 1.17 riastrad SDT_PROBE3(sdt, kernel, threadpool, create__success, ci, pri, pool); 422 1.1 thorpej return 0; 423 1.1 thorpej 424 1.1 thorpej fail0: KASSERT(error); 425 1.21 riastrad KASSERT(pool->tp_dispatcher.tpt_job == NULL); 426 1.21 riastrad KASSERT(pool->tp_dispatcher.tpt_pool == pool); 427 1.1 thorpej KASSERT(pool->tp_flags == 0); 428 1.1 thorpej KASSERT(pool->tp_refcnt == 0); 429 1.1 thorpej KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 430 1.1 thorpej KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 431 1.21 riastrad KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv)); 432 1.21 riastrad cv_destroy(&pool->tp_dispatcher.tpt_cv); 433 1.1 thorpej mutex_destroy(&pool->tp_lock); 434 1.17 riastrad SDT_PROBE3(sdt, kernel, threadpool, create__failure, ci, pri, error); 435 1.1 thorpej return error; 436 1.1 thorpej } 437 1.1 thorpej 438 1.1 thorpej /* Thread pool destruction */ 439 1.1 thorpej 440 1.1 thorpej static void 441 1.6 thorpej threadpool_destroy(struct threadpool *pool) 442 1.1 thorpej { 443 1.1 thorpej struct threadpool_thread *thread; 444 1.1 thorpej 445 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, destroy, pool); 446 1.17 riastrad 447 1.1 thorpej /* Mark the pool dying and wait for threads to commit suicide. */ 448 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 449 1.1 thorpej KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 450 1.1 thorpej pool->tp_flags |= THREADPOOL_DYING; 451 1.21 riastrad cv_broadcast(&pool->tp_dispatcher.tpt_cv); 452 1.1 thorpej TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry) 453 1.1 thorpej cv_broadcast(&thread->tpt_cv); 454 1.1 thorpej while (0 < pool->tp_refcnt) { 455 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, destroy__wait, 456 1.17 riastrad pool, pool->tp_refcnt); 457 1.21 riastrad cv_wait(&pool->tp_dispatcher.tpt_cv, &pool->tp_lock); 458 1.1 thorpej } 459 1.1 thorpej mutex_spin_exit(&pool->tp_lock); 460 1.1 thorpej 461 1.21 riastrad KASSERT(pool->tp_dispatcher.tpt_job == NULL); 462 1.21 riastrad KASSERT(pool->tp_dispatcher.tpt_pool == pool); 463 1.1 thorpej KASSERT(pool->tp_flags == THREADPOOL_DYING); 464 1.1 thorpej KASSERT(pool->tp_refcnt == 0); 465 1.1 thorpej KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 466 1.1 thorpej KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 467 1.21 riastrad KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv)); 468 1.21 riastrad cv_destroy(&pool->tp_dispatcher.tpt_cv); 469 1.1 thorpej mutex_destroy(&pool->tp_lock); 470 1.1 thorpej } 471 1.1 thorpej 472 1.7 thorpej static void 473 1.4 thorpej threadpool_hold(struct threadpool *pool) 474 1.1 thorpej { 475 1.1 thorpej 476 1.7 thorpej KASSERT(mutex_owned(&pool->tp_lock)); 477 1.7 thorpej pool->tp_refcnt++; 478 1.7 thorpej KASSERT(pool->tp_refcnt != 0); 479 1.1 thorpej } 480 1.1 thorpej 481 1.1 thorpej static void 482 1.4 thorpej threadpool_rele(struct threadpool *pool) 483 1.1 thorpej { 484 1.1 thorpej 485 1.7 thorpej KASSERT(mutex_owned(&pool->tp_lock)); 486 1.7 thorpej KASSERT(0 < pool->tp_refcnt); 487 1.8 thorpej if (--pool->tp_refcnt == 0) 488 1.21 riastrad cv_broadcast(&pool->tp_dispatcher.tpt_cv); 489 1.1 thorpej } 490 1.1 thorpej 491 1.1 thorpej /* Unbound thread pools */ 492 1.1 thorpej 493 1.1 thorpej int 494 1.4 thorpej threadpool_get(struct threadpool **poolp, pri_t pri) 495 1.1 thorpej { 496 1.1 thorpej struct threadpool_unbound *tpu, *tmp = NULL; 497 1.1 thorpej int error; 498 1.1 thorpej 499 1.1 thorpej ASSERT_SLEEPABLE(); 500 1.1 thorpej 501 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, get, pri); 502 1.17 riastrad 503 1.1 thorpej if (! threadpool_pri_is_valid(pri)) 504 1.1 thorpej return EINVAL; 505 1.1 thorpej 506 1.1 thorpej mutex_enter(&threadpools_lock); 507 1.1 thorpej tpu = threadpool_lookup_unbound(pri); 508 1.1 thorpej if (tpu == NULL) { 509 1.1 thorpej mutex_exit(&threadpools_lock); 510 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, get__create, pri); 511 1.6 thorpej tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP); 512 1.6 thorpej error = threadpool_create(&tmp->tpu_pool, NULL, pri); 513 1.6 thorpej if (error) { 514 1.6 thorpej kmem_free(tmp, sizeof(*tmp)); 515 1.1 thorpej return error; 516 1.6 thorpej } 517 1.1 thorpej mutex_enter(&threadpools_lock); 518 1.1 thorpej tpu = threadpool_lookup_unbound(pri); 519 1.1 thorpej if (tpu == NULL) { 520 1.1 thorpej tpu = tmp; 521 1.1 thorpej tmp = NULL; 522 1.1 thorpej threadpool_insert_unbound(tpu); 523 1.17 riastrad } else { 524 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, get__race, pri); 525 1.1 thorpej } 526 1.1 thorpej } 527 1.1 thorpej KASSERT(tpu != NULL); 528 1.1 thorpej tpu->tpu_refcnt++; 529 1.5 thorpej KASSERT(tpu->tpu_refcnt != 0); 530 1.1 thorpej mutex_exit(&threadpools_lock); 531 1.1 thorpej 532 1.6 thorpej if (tmp != NULL) { 533 1.6 thorpej threadpool_destroy(&tmp->tpu_pool); 534 1.6 thorpej kmem_free(tmp, sizeof(*tmp)); 535 1.6 thorpej } 536 1.1 thorpej KASSERT(tpu != NULL); 537 1.1 thorpej *poolp = &tpu->tpu_pool; 538 1.1 thorpej return 0; 539 1.1 thorpej } 540 1.1 thorpej 541 1.1 thorpej void 542 1.4 thorpej threadpool_put(struct threadpool *pool, pri_t pri) 543 1.1 thorpej { 544 1.1 thorpej struct threadpool_unbound *tpu = 545 1.1 thorpej container_of(pool, struct threadpool_unbound, tpu_pool); 546 1.1 thorpej 547 1.1 thorpej ASSERT_SLEEPABLE(); 548 1.17 riastrad KASSERT(threadpool_pri_is_valid(pri)); 549 1.1 thorpej 550 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, put, pool, pri); 551 1.1 thorpej 552 1.1 thorpej mutex_enter(&threadpools_lock); 553 1.1 thorpej KASSERT(tpu == threadpool_lookup_unbound(pri)); 554 1.1 thorpej KASSERT(0 < tpu->tpu_refcnt); 555 1.1 thorpej if (--tpu->tpu_refcnt == 0) { 556 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, put__destroy, pool, pri); 557 1.1 thorpej threadpool_remove_unbound(tpu); 558 1.5 thorpej } else { 559 1.1 thorpej tpu = NULL; 560 1.5 thorpej } 561 1.1 thorpej mutex_exit(&threadpools_lock); 562 1.1 thorpej 563 1.6 thorpej if (tpu) { 564 1.6 thorpej threadpool_destroy(&tpu->tpu_pool); 565 1.6 thorpej kmem_free(tpu, sizeof(*tpu)); 566 1.6 thorpej } 567 1.1 thorpej } 568 1.1 thorpej 569 1.1 thorpej /* Per-CPU thread pools */ 570 1.1 thorpej 571 1.1 thorpej int 572 1.4 thorpej threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri) 573 1.1 thorpej { 574 1.4 thorpej struct threadpool_percpu *pool_percpu, *tmp = NULL; 575 1.1 thorpej int error; 576 1.1 thorpej 577 1.1 thorpej ASSERT_SLEEPABLE(); 578 1.1 thorpej 579 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, percpu__get, pri); 580 1.17 riastrad 581 1.1 thorpej if (! threadpool_pri_is_valid(pri)) 582 1.1 thorpej return EINVAL; 583 1.1 thorpej 584 1.1 thorpej mutex_enter(&threadpools_lock); 585 1.1 thorpej pool_percpu = threadpool_lookup_percpu(pri); 586 1.1 thorpej if (pool_percpu == NULL) { 587 1.1 thorpej mutex_exit(&threadpools_lock); 588 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, percpu__get__create, pri); 589 1.1 thorpej error = threadpool_percpu_create(&tmp, pri); 590 1.1 thorpej if (error) 591 1.1 thorpej return error; 592 1.1 thorpej KASSERT(tmp != NULL); 593 1.1 thorpej mutex_enter(&threadpools_lock); 594 1.1 thorpej pool_percpu = threadpool_lookup_percpu(pri); 595 1.1 thorpej if (pool_percpu == NULL) { 596 1.1 thorpej pool_percpu = tmp; 597 1.1 thorpej tmp = NULL; 598 1.1 thorpej threadpool_insert_percpu(pool_percpu); 599 1.17 riastrad } else { 600 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, percpu__get__race, 601 1.17 riastrad pri); 602 1.1 thorpej } 603 1.1 thorpej } 604 1.1 thorpej KASSERT(pool_percpu != NULL); 605 1.1 thorpej pool_percpu->tpp_refcnt++; 606 1.5 thorpej KASSERT(pool_percpu->tpp_refcnt != 0); 607 1.1 thorpej mutex_exit(&threadpools_lock); 608 1.1 thorpej 609 1.1 thorpej if (tmp != NULL) 610 1.1 thorpej threadpool_percpu_destroy(tmp); 611 1.1 thorpej KASSERT(pool_percpu != NULL); 612 1.1 thorpej *pool_percpup = pool_percpu; 613 1.1 thorpej return 0; 614 1.1 thorpej } 615 1.1 thorpej 616 1.1 thorpej void 617 1.4 thorpej threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri) 618 1.1 thorpej { 619 1.1 thorpej 620 1.1 thorpej ASSERT_SLEEPABLE(); 621 1.1 thorpej 622 1.1 thorpej KASSERT(threadpool_pri_is_valid(pri)); 623 1.1 thorpej 624 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, percpu__put, pool_percpu, pri); 625 1.17 riastrad 626 1.1 thorpej mutex_enter(&threadpools_lock); 627 1.1 thorpej KASSERT(pool_percpu == threadpool_lookup_percpu(pri)); 628 1.1 thorpej KASSERT(0 < pool_percpu->tpp_refcnt); 629 1.1 thorpej if (--pool_percpu->tpp_refcnt == 0) { 630 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, percpu__put__destroy, 631 1.17 riastrad pool_percpu, pri); 632 1.1 thorpej threadpool_remove_percpu(pool_percpu); 633 1.5 thorpej } else { 634 1.1 thorpej pool_percpu = NULL; 635 1.5 thorpej } 636 1.1 thorpej mutex_exit(&threadpools_lock); 637 1.1 thorpej 638 1.1 thorpej if (pool_percpu) 639 1.1 thorpej threadpool_percpu_destroy(pool_percpu); 640 1.1 thorpej } 641 1.1 thorpej 642 1.4 thorpej struct threadpool * 643 1.4 thorpej threadpool_percpu_ref(struct threadpool_percpu *pool_percpu) 644 1.1 thorpej { 645 1.4 thorpej struct threadpool **poolp, *pool; 646 1.1 thorpej 647 1.1 thorpej poolp = percpu_getref(pool_percpu->tpp_percpu); 648 1.1 thorpej pool = *poolp; 649 1.1 thorpej percpu_putref(pool_percpu->tpp_percpu); 650 1.1 thorpej 651 1.1 thorpej return pool; 652 1.1 thorpej } 653 1.1 thorpej 654 1.4 thorpej struct threadpool * 655 1.4 thorpej threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu, 656 1.1 thorpej struct cpu_info *ci) 657 1.1 thorpej { 658 1.4 thorpej struct threadpool **poolp, *pool; 659 1.1 thorpej 660 1.20 riastrad /* 661 1.20 riastrad * As long as xcalls are blocked -- e.g., by kpreempt_disable 662 1.20 riastrad * -- the percpu object will not be swapped and destroyed. We 663 1.20 riastrad * can't write to it, because the data may have already been 664 1.20 riastrad * moved to a new buffer, but we can safely read from it. 665 1.20 riastrad */ 666 1.20 riastrad kpreempt_disable(); 667 1.1 thorpej poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 668 1.1 thorpej pool = *poolp; 669 1.20 riastrad kpreempt_enable(); 670 1.1 thorpej 671 1.1 thorpej return pool; 672 1.1 thorpej } 673 1.1 thorpej 674 1.1 thorpej static int 675 1.4 thorpej threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri) 676 1.1 thorpej { 677 1.4 thorpej struct threadpool_percpu *pool_percpu; 678 1.16 riastrad bool ok = true; 679 1.1 thorpej 680 1.1 thorpej pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP); 681 1.1 thorpej pool_percpu->tpp_pri = pri; 682 1.16 riastrad pool_percpu->tpp_percpu = percpu_create(sizeof(struct threadpool *), 683 1.16 riastrad threadpool_percpu_init, threadpool_percpu_fini, 684 1.16 riastrad (void *)(intptr_t)pri); 685 1.1 thorpej 686 1.16 riastrad /* 687 1.16 riastrad * Verify that all of the CPUs were initialized. 688 1.16 riastrad * 689 1.16 riastrad * XXX What to do if we add CPU hotplug? 690 1.16 riastrad */ 691 1.16 riastrad percpu_foreach(pool_percpu->tpp_percpu, &threadpool_percpu_ok, &ok); 692 1.16 riastrad if (!ok) 693 1.16 riastrad goto fail; 694 1.1 thorpej 695 1.1 thorpej /* Success! */ 696 1.4 thorpej *pool_percpup = (struct threadpool_percpu *)pool_percpu; 697 1.1 thorpej return 0; 698 1.1 thorpej 699 1.16 riastrad fail: percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); 700 1.16 riastrad kmem_free(pool_percpu, sizeof(*pool_percpu)); 701 1.16 riastrad return ENOMEM; 702 1.1 thorpej } 703 1.1 thorpej 704 1.1 thorpej static void 705 1.4 thorpej threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu) 706 1.1 thorpej { 707 1.1 thorpej 708 1.16 riastrad percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); 709 1.16 riastrad kmem_free(pool_percpu, sizeof(*pool_percpu)); 710 1.16 riastrad } 711 1.16 riastrad 712 1.16 riastrad static void 713 1.16 riastrad threadpool_percpu_init(void *vpoolp, void *vpri, struct cpu_info *ci) 714 1.16 riastrad { 715 1.16 riastrad struct threadpool **const poolp = vpoolp; 716 1.16 riastrad pri_t pri = (intptr_t)(void *)vpri; 717 1.16 riastrad int error; 718 1.16 riastrad 719 1.16 riastrad *poolp = kmem_zalloc(sizeof(**poolp), KM_SLEEP); 720 1.16 riastrad error = threadpool_create(*poolp, ci, pri); 721 1.16 riastrad if (error) { 722 1.16 riastrad KASSERT(error == ENOMEM); 723 1.16 riastrad kmem_free(*poolp, sizeof(**poolp)); 724 1.16 riastrad *poolp = NULL; 725 1.1 thorpej } 726 1.16 riastrad } 727 1.16 riastrad 728 1.16 riastrad static void 729 1.16 riastrad threadpool_percpu_ok(void *vpoolp, void *vokp, struct cpu_info *ci) 730 1.16 riastrad { 731 1.16 riastrad struct threadpool **const poolp = vpoolp; 732 1.16 riastrad bool *okp = vokp; 733 1.16 riastrad 734 1.16 riastrad if (*poolp == NULL) 735 1.16 riastrad atomic_store_relaxed(okp, false); 736 1.16 riastrad } 737 1.1 thorpej 738 1.16 riastrad static void 739 1.16 riastrad threadpool_percpu_fini(void *vpoolp, void *vprip, struct cpu_info *ci) 740 1.16 riastrad { 741 1.16 riastrad struct threadpool **const poolp = vpoolp; 742 1.16 riastrad 743 1.16 riastrad if (*poolp == NULL) /* initialization failed */ 744 1.16 riastrad return; 745 1.16 riastrad threadpool_destroy(*poolp); 746 1.16 riastrad kmem_free(*poolp, sizeof(**poolp)); 747 1.1 thorpej } 748 1.1 thorpej 749 1.1 thorpej /* Thread pool jobs */ 750 1.1 thorpej 751 1.1 thorpej void __printflike(4,5) 752 1.4 thorpej threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn, 753 1.1 thorpej kmutex_t *lock, const char *fmt, ...) 754 1.1 thorpej { 755 1.1 thorpej va_list ap; 756 1.1 thorpej 757 1.1 thorpej va_start(ap, fmt); 758 1.1 thorpej (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap); 759 1.1 thorpej va_end(ap); 760 1.1 thorpej 761 1.1 thorpej job->job_lock = lock; 762 1.1 thorpej job->job_thread = NULL; 763 1.1 thorpej job->job_refcnt = 0; 764 1.1 thorpej cv_init(&job->job_cv, job->job_name); 765 1.1 thorpej job->job_fn = fn; 766 1.1 thorpej } 767 1.1 thorpej 768 1.1 thorpej static void 769 1.4 thorpej threadpool_job_dead(struct threadpool_job *job) 770 1.1 thorpej { 771 1.1 thorpej 772 1.4 thorpej panic("threadpool job %p ran after destruction", job); 773 1.1 thorpej } 774 1.1 thorpej 775 1.1 thorpej void 776 1.4 thorpej threadpool_job_destroy(struct threadpool_job *job) 777 1.1 thorpej { 778 1.1 thorpej 779 1.1 thorpej ASSERT_SLEEPABLE(); 780 1.1 thorpej 781 1.1 thorpej KASSERTMSG((job->job_thread == NULL), "job %p still running", job); 782 1.1 thorpej 783 1.1 thorpej mutex_enter(job->job_lock); 784 1.19 riastrad while (0 < atomic_load_relaxed(&job->job_refcnt)) 785 1.1 thorpej cv_wait(&job->job_cv, job->job_lock); 786 1.1 thorpej mutex_exit(job->job_lock); 787 1.1 thorpej 788 1.1 thorpej job->job_lock = NULL; 789 1.1 thorpej KASSERT(job->job_thread == NULL); 790 1.1 thorpej KASSERT(job->job_refcnt == 0); 791 1.1 thorpej KASSERT(!cv_has_waiters(&job->job_cv)); 792 1.1 thorpej cv_destroy(&job->job_cv); 793 1.1 thorpej job->job_fn = threadpool_job_dead; 794 1.1 thorpej (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name)); 795 1.1 thorpej } 796 1.1 thorpej 797 1.13 thorpej static void 798 1.4 thorpej threadpool_job_hold(struct threadpool_job *job) 799 1.1 thorpej { 800 1.19 riastrad unsigned int refcnt __diagused; 801 1.9 thorpej 802 1.19 riastrad refcnt = atomic_inc_uint_nv(&job->job_refcnt); 803 1.19 riastrad KASSERT(refcnt != 0); 804 1.1 thorpej } 805 1.1 thorpej 806 1.1 thorpej static void 807 1.4 thorpej threadpool_job_rele(struct threadpool_job *job) 808 1.1 thorpej { 809 1.1 thorpej unsigned int refcnt; 810 1.1 thorpej 811 1.13 thorpej KASSERT(mutex_owned(job->job_lock)); 812 1.13 thorpej 813 1.19 riastrad refcnt = atomic_dec_uint_nv(&job->job_refcnt); 814 1.19 riastrad KASSERT(refcnt != UINT_MAX); 815 1.19 riastrad if (refcnt == 0) 816 1.19 riastrad cv_broadcast(&job->job_cv); 817 1.1 thorpej } 818 1.1 thorpej 819 1.1 thorpej void 820 1.4 thorpej threadpool_job_done(struct threadpool_job *job) 821 1.1 thorpej { 822 1.1 thorpej 823 1.1 thorpej KASSERT(mutex_owned(job->job_lock)); 824 1.1 thorpej KASSERT(job->job_thread != NULL); 825 1.1 thorpej KASSERT(job->job_thread->tpt_lwp == curlwp); 826 1.1 thorpej 827 1.12 thorpej /* 828 1.12 thorpej * We can safely read this field; it's only modified right before 829 1.12 thorpej * we call the job work function, and we are only preserving it 830 1.12 thorpej * to use here; no one cares if it contains junk afterward. 831 1.12 thorpej */ 832 1.12 thorpej lwp_lock(curlwp); 833 1.12 thorpej curlwp->l_name = job->job_thread->tpt_lwp_savedname; 834 1.12 thorpej lwp_unlock(curlwp); 835 1.12 thorpej 836 1.13 thorpej /* 837 1.13 thorpej * Inline the work of threadpool_job_rele(); the job is already 838 1.13 thorpej * locked, the most likely scenario (XXXJRT only scenario?) is 839 1.13 thorpej * that we're dropping the last reference (the one taken in 840 1.13 thorpej * threadpool_schedule_job()), and we always do the cv_broadcast() 841 1.13 thorpej * anyway. 842 1.13 thorpej */ 843 1.19 riastrad KASSERT(0 < atomic_load_relaxed(&job->job_refcnt)); 844 1.13 thorpej unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt); 845 1.13 thorpej KASSERT(refcnt != UINT_MAX); 846 1.1 thorpej cv_broadcast(&job->job_cv); 847 1.1 thorpej job->job_thread = NULL; 848 1.1 thorpej } 849 1.1 thorpej 850 1.1 thorpej void 851 1.4 thorpej threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) 852 1.1 thorpej { 853 1.1 thorpej 854 1.1 thorpej KASSERT(mutex_owned(job->job_lock)); 855 1.1 thorpej 856 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, schedule__job, pool, job); 857 1.17 riastrad 858 1.1 thorpej /* 859 1.1 thorpej * If the job's already running, let it keep running. The job 860 1.1 thorpej * is guaranteed by the interlock not to end early -- if it had 861 1.1 thorpej * ended early, threadpool_job_done would have set job_thread 862 1.1 thorpej * to NULL under the interlock. 863 1.1 thorpej */ 864 1.1 thorpej if (__predict_true(job->job_thread != NULL)) { 865 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, schedule__job__running, 866 1.17 riastrad pool, job); 867 1.1 thorpej return; 868 1.1 thorpej } 869 1.1 thorpej 870 1.13 thorpej threadpool_job_hold(job); 871 1.13 thorpej 872 1.1 thorpej /* Otherwise, try to assign a thread to the job. */ 873 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 874 1.1 thorpej if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) { 875 1.21 riastrad /* Nobody's idle. Give it to the dispatcher. */ 876 1.21 riastrad SDT_PROBE2(sdt, kernel, threadpool, schedule__job__dispatcher, 877 1.17 riastrad pool, job); 878 1.21 riastrad job->job_thread = &pool->tp_dispatcher; 879 1.1 thorpej TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry); 880 1.1 thorpej } else { 881 1.1 thorpej /* Assign it to the first idle thread. */ 882 1.1 thorpej job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads); 883 1.17 riastrad SDT_PROBE3(sdt, kernel, threadpool, schedule__job__thread, 884 1.17 riastrad pool, job, job->job_thread->tpt_lwp); 885 1.1 thorpej TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread, 886 1.1 thorpej tpt_entry); 887 1.1 thorpej job->job_thread->tpt_job = job; 888 1.1 thorpej } 889 1.1 thorpej 890 1.21 riastrad /* Notify whomever we gave it to, dispatcher or idle thread. */ 891 1.1 thorpej KASSERT(job->job_thread != NULL); 892 1.1 thorpej cv_broadcast(&job->job_thread->tpt_cv); 893 1.1 thorpej mutex_spin_exit(&pool->tp_lock); 894 1.1 thorpej } 895 1.1 thorpej 896 1.1 thorpej bool 897 1.4 thorpej threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job) 898 1.1 thorpej { 899 1.1 thorpej 900 1.1 thorpej KASSERT(mutex_owned(job->job_lock)); 901 1.1 thorpej 902 1.1 thorpej /* 903 1.1 thorpej * XXXJRT This fails (albeit safely) when all of the following 904 1.1 thorpej * are true: 905 1.1 thorpej * 906 1.1 thorpej * => "pool" is something other than what the job was 907 1.1 thorpej * scheduled on. This can legitimately occur if, 908 1.1 thorpej * for example, a job is percpu-scheduled on CPU0 909 1.1 thorpej * and then CPU1 attempts to cancel it without taking 910 1.1 thorpej * a remote pool reference. (this might happen by 911 1.1 thorpej * "luck of the draw"). 912 1.1 thorpej * 913 1.1 thorpej * => "job" is not yet running, but is assigned to the 914 1.21 riastrad * dispatcher. 915 1.1 thorpej * 916 1.1 thorpej * When this happens, this code makes the determination that 917 1.1 thorpej * the job is already running. The failure mode is that the 918 1.1 thorpej * caller is told the job is running, and thus has to wait. 919 1.21 riastrad * The dispatcher will eventually get to it and the job will 920 1.1 thorpej * proceed as if it had been already running. 921 1.1 thorpej */ 922 1.1 thorpej 923 1.1 thorpej if (job->job_thread == NULL) { 924 1.1 thorpej /* Nothing to do. Guaranteed not running. */ 925 1.1 thorpej return true; 926 1.21 riastrad } else if (job->job_thread == &pool->tp_dispatcher) { 927 1.1 thorpej /* Take it off the list to guarantee it won't run. */ 928 1.1 thorpej job->job_thread = NULL; 929 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 930 1.1 thorpej TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 931 1.1 thorpej mutex_spin_exit(&pool->tp_lock); 932 1.13 thorpej threadpool_job_rele(job); 933 1.1 thorpej return true; 934 1.1 thorpej } else { 935 1.1 thorpej /* Too late -- already running. */ 936 1.1 thorpej return false; 937 1.1 thorpej } 938 1.1 thorpej } 939 1.1 thorpej 940 1.1 thorpej void 941 1.4 thorpej threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job) 942 1.1 thorpej { 943 1.1 thorpej 944 1.18 thorpej /* 945 1.18 thorpej * We may sleep here, but we can't ASSERT_SLEEPABLE() because 946 1.18 thorpej * the job lock (used to interlock the cv_wait()) may in fact 947 1.18 thorpej * legitimately be a spin lock, so the assertion would fire 948 1.18 thorpej * as a false-positive. 949 1.18 thorpej */ 950 1.1 thorpej 951 1.1 thorpej KASSERT(mutex_owned(job->job_lock)); 952 1.1 thorpej 953 1.4 thorpej if (threadpool_cancel_job_async(pool, job)) 954 1.1 thorpej return; 955 1.1 thorpej 956 1.1 thorpej /* Already running. Wait for it to complete. */ 957 1.1 thorpej while (job->job_thread != NULL) 958 1.1 thorpej cv_wait(&job->job_cv, job->job_lock); 959 1.1 thorpej } 960 1.1 thorpej 961 1.21 riastrad /* Thread pool dispatcher thread */ 962 1.1 thorpej 963 1.1 thorpej static void __dead 964 1.21 riastrad threadpool_dispatcher_thread(void *arg) 965 1.1 thorpej { 966 1.21 riastrad struct threadpool_thread *const dispatcher = arg; 967 1.21 riastrad struct threadpool *const pool = dispatcher->tpt_pool; 968 1.1 thorpej struct lwp *lwp = NULL; 969 1.1 thorpej int ktflags; 970 1.21 riastrad char suffix[16]; 971 1.1 thorpej int error; 972 1.1 thorpej 973 1.1 thorpej KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 974 1.17 riastrad KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND)); 975 1.1 thorpej 976 1.1 thorpej /* Wait until we're initialized. */ 977 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 978 1.21 riastrad while (dispatcher->tpt_lwp == NULL) 979 1.21 riastrad cv_wait(&dispatcher->tpt_cv, &pool->tp_lock); 980 1.1 thorpej 981 1.21 riastrad SDT_PROBE1(sdt, kernel, threadpool, dispatcher__start, pool); 982 1.1 thorpej 983 1.1 thorpej for (;;) { 984 1.1 thorpej /* Wait until there's a job. */ 985 1.1 thorpej while (TAILQ_EMPTY(&pool->tp_jobs)) { 986 1.1 thorpej if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 987 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, 988 1.21 riastrad dispatcher__dying, pool); 989 1.1 thorpej break; 990 1.1 thorpej } 991 1.21 riastrad cv_wait(&dispatcher->tpt_cv, &pool->tp_lock); 992 1.1 thorpej } 993 1.1 thorpej if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs))) 994 1.1 thorpej break; 995 1.1 thorpej 996 1.1 thorpej /* If there are no threads, we'll have to try to start one. */ 997 1.1 thorpej if (TAILQ_EMPTY(&pool->tp_idle_threads)) { 998 1.21 riastrad SDT_PROBE1(sdt, kernel, threadpool, dispatcher__spawn, 999 1.17 riastrad pool); 1000 1.7 thorpej threadpool_hold(pool); 1001 1.1 thorpej mutex_spin_exit(&pool->tp_lock); 1002 1.1 thorpej 1003 1.1 thorpej struct threadpool_thread *const thread = 1004 1.1 thorpej pool_cache_get(threadpool_thread_pc, PR_WAITOK); 1005 1.1 thorpej thread->tpt_lwp = NULL; 1006 1.1 thorpej thread->tpt_pool = pool; 1007 1.1 thorpej thread->tpt_job = NULL; 1008 1.21 riastrad cv_init(&thread->tpt_cv, "pooljob"); 1009 1.1 thorpej 1010 1.1 thorpej ktflags = 0; 1011 1.1 thorpej ktflags |= KTHREAD_MPSAFE; 1012 1.1 thorpej if (pool->tp_pri < PRI_KERNEL) 1013 1.1 thorpej ktflags |= KTHREAD_TS; 1014 1.21 riastrad threadnamesuffix(suffix, sizeof(suffix), pool->tp_cpu, 1015 1.21 riastrad pool->tp_pri); 1016 1.1 thorpej error = kthread_create(pool->tp_pri, ktflags, 1017 1.1 thorpej pool->tp_cpu, &threadpool_thread, thread, &lwp, 1018 1.21 riastrad "poolthread%s", suffix); 1019 1.1 thorpej 1020 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 1021 1.1 thorpej if (error) { 1022 1.1 thorpej pool_cache_put(threadpool_thread_pc, thread); 1023 1.1 thorpej threadpool_rele(pool); 1024 1.1 thorpej /* XXX What to do to wait for memory? */ 1025 1.1 thorpej (void)kpause("thrdplcr", false, hz, 1026 1.1 thorpej &pool->tp_lock); 1027 1.1 thorpej continue; 1028 1.1 thorpej } 1029 1.7 thorpej /* 1030 1.7 thorpej * New kthread now owns the reference to the pool 1031 1.7 thorpej * taken above. 1032 1.7 thorpej */ 1033 1.1 thorpej KASSERT(lwp != NULL); 1034 1.1 thorpej TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, 1035 1.1 thorpej tpt_entry); 1036 1.1 thorpej thread->tpt_lwp = lwp; 1037 1.1 thorpej lwp = NULL; 1038 1.1 thorpej cv_broadcast(&thread->tpt_cv); 1039 1.1 thorpej continue; 1040 1.1 thorpej } 1041 1.1 thorpej 1042 1.1 thorpej /* There are idle threads, so try giving one a job. */ 1043 1.4 thorpej struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs); 1044 1.23 riastrad 1045 1.13 thorpej /* 1046 1.13 thorpej * Take an extra reference on the job temporarily so that 1047 1.13 thorpej * it won't disappear on us while we have both locks dropped. 1048 1.13 thorpej */ 1049 1.13 thorpej threadpool_job_hold(job); 1050 1.1 thorpej mutex_spin_exit(&pool->tp_lock); 1051 1.1 thorpej 1052 1.1 thorpej mutex_enter(job->job_lock); 1053 1.1 thorpej /* If the job was cancelled, we'll no longer be its thread. */ 1054 1.21 riastrad if (__predict_true(job->job_thread == dispatcher)) { 1055 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 1056 1.23 riastrad TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 1057 1.1 thorpej if (__predict_false( 1058 1.1 thorpej TAILQ_EMPTY(&pool->tp_idle_threads))) { 1059 1.1 thorpej /* 1060 1.1 thorpej * Someone else snagged the thread 1061 1.1 thorpej * first. We'll have to try again. 1062 1.1 thorpej */ 1063 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, 1064 1.21 riastrad dispatcher__race, pool, job); 1065 1.1 thorpej TAILQ_INSERT_HEAD(&pool->tp_jobs, job, 1066 1.1 thorpej job_entry); 1067 1.1 thorpej } else { 1068 1.1 thorpej /* 1069 1.1 thorpej * Assign the job to the thread and 1070 1.1 thorpej * wake the thread so it starts work. 1071 1.1 thorpej */ 1072 1.1 thorpej struct threadpool_thread *const thread = 1073 1.1 thorpej TAILQ_FIRST(&pool->tp_idle_threads); 1074 1.1 thorpej 1075 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, 1076 1.21 riastrad dispatcher__assign, job, thread->tpt_lwp); 1077 1.1 thorpej KASSERT(thread->tpt_job == NULL); 1078 1.1 thorpej TAILQ_REMOVE(&pool->tp_idle_threads, thread, 1079 1.1 thorpej tpt_entry); 1080 1.1 thorpej thread->tpt_job = job; 1081 1.1 thorpej job->job_thread = thread; 1082 1.1 thorpej cv_broadcast(&thread->tpt_cv); 1083 1.1 thorpej } 1084 1.1 thorpej mutex_spin_exit(&pool->tp_lock); 1085 1.1 thorpej } 1086 1.13 thorpej threadpool_job_rele(job); 1087 1.1 thorpej mutex_exit(job->job_lock); 1088 1.1 thorpej 1089 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 1090 1.1 thorpej } 1091 1.7 thorpej threadpool_rele(pool); 1092 1.1 thorpej mutex_spin_exit(&pool->tp_lock); 1093 1.1 thorpej 1094 1.21 riastrad SDT_PROBE1(sdt, kernel, threadpool, dispatcher__exit, pool); 1095 1.1 thorpej 1096 1.1 thorpej kthread_exit(0); 1097 1.1 thorpej } 1098 1.1 thorpej 1099 1.1 thorpej /* Thread pool thread */ 1100 1.1 thorpej 1101 1.1 thorpej static void __dead 1102 1.1 thorpej threadpool_thread(void *arg) 1103 1.1 thorpej { 1104 1.1 thorpej struct threadpool_thread *const thread = arg; 1105 1.4 thorpej struct threadpool *const pool = thread->tpt_pool; 1106 1.1 thorpej 1107 1.1 thorpej KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 1108 1.17 riastrad KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND)); 1109 1.1 thorpej 1110 1.1 thorpej /* Wait until we're initialized and on the queue. */ 1111 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 1112 1.1 thorpej while (thread->tpt_lwp == NULL) 1113 1.1 thorpej cv_wait(&thread->tpt_cv, &pool->tp_lock); 1114 1.1 thorpej 1115 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, thread__start, pool); 1116 1.1 thorpej 1117 1.1 thorpej KASSERT(thread->tpt_lwp == curlwp); 1118 1.1 thorpej for (;;) { 1119 1.1 thorpej /* Wait until we are assigned a job. */ 1120 1.1 thorpej while (thread->tpt_job == NULL) { 1121 1.1 thorpej if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 1122 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, 1123 1.17 riastrad thread__dying, pool); 1124 1.1 thorpej break; 1125 1.1 thorpej } 1126 1.1 thorpej if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock, 1127 1.14 thorpej mstohz(threadpool_idle_time_ms))) 1128 1.1 thorpej break; 1129 1.1 thorpej } 1130 1.1 thorpej if (__predict_false(thread->tpt_job == NULL)) { 1131 1.1 thorpej TAILQ_REMOVE(&pool->tp_idle_threads, thread, 1132 1.1 thorpej tpt_entry); 1133 1.1 thorpej break; 1134 1.1 thorpej } 1135 1.1 thorpej 1136 1.4 thorpej struct threadpool_job *const job = thread->tpt_job; 1137 1.1 thorpej KASSERT(job != NULL); 1138 1.1 thorpej 1139 1.1 thorpej /* Set our lwp name to reflect what job we're doing. */ 1140 1.1 thorpej lwp_lock(curlwp); 1141 1.12 thorpej char *const lwp_name __diagused = curlwp->l_name; 1142 1.12 thorpej thread->tpt_lwp_savedname = curlwp->l_name; 1143 1.1 thorpej curlwp->l_name = job->job_name; 1144 1.1 thorpej lwp_unlock(curlwp); 1145 1.1 thorpej 1146 1.12 thorpej mutex_spin_exit(&pool->tp_lock); 1147 1.12 thorpej 1148 1.17 riastrad SDT_PROBE2(sdt, kernel, threadpool, thread__job, pool, job); 1149 1.12 thorpej 1150 1.1 thorpej /* Run the job. */ 1151 1.4 thorpej (*job->job_fn)(job); 1152 1.1 thorpej 1153 1.12 thorpej /* lwp name restored in threadpool_job_done(). */ 1154 1.12 thorpej KASSERTMSG((curlwp->l_name == lwp_name), 1155 1.12 thorpej "someone forgot to call threadpool_job_done()!"); 1156 1.1 thorpej 1157 1.13 thorpej /* 1158 1.13 thorpej * We can compare pointers, but we can no longer deference 1159 1.13 thorpej * job after this because threadpool_job_done() drops the 1160 1.13 thorpej * last reference on the job while the job is locked. 1161 1.13 thorpej */ 1162 1.1 thorpej 1163 1.1 thorpej mutex_spin_enter(&pool->tp_lock); 1164 1.1 thorpej KASSERT(thread->tpt_job == job); 1165 1.1 thorpej thread->tpt_job = NULL; 1166 1.1 thorpej TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry); 1167 1.1 thorpej } 1168 1.7 thorpej threadpool_rele(pool); 1169 1.1 thorpej mutex_spin_exit(&pool->tp_lock); 1170 1.1 thorpej 1171 1.17 riastrad SDT_PROBE1(sdt, kernel, threadpool, thread__exit, pool); 1172 1.1 thorpej 1173 1.1 thorpej KASSERT(!cv_has_waiters(&thread->tpt_cv)); 1174 1.1 thorpej cv_destroy(&thread->tpt_cv); 1175 1.1 thorpej pool_cache_put(threadpool_thread_pc, thread); 1176 1.1 thorpej kthread_exit(0); 1177 1.1 thorpej } 1178