1 1.6 andvar /* $NetBSD: threadpool.c,v 1.6 2024/02/02 21:52:23 andvar Exp $ */ 2 1.1 thorpej 3 1.1 thorpej /*- 4 1.1 thorpej * Copyright (c) 2018 The NetBSD Foundation, Inc. 5 1.1 thorpej * All rights reserved. 6 1.1 thorpej * 7 1.1 thorpej * This code is derived from software contributed to The NetBSD Foundation 8 1.1 thorpej * by Jason R. Thorpe. 9 1.1 thorpej * 10 1.1 thorpej * Redistribution and use in source and binary forms, with or without 11 1.1 thorpej * modification, are permitted provided that the following conditions 12 1.1 thorpej * are met: 13 1.1 thorpej * 1. Redistributions of source code must retain the above copyright 14 1.1 thorpej * notice, this list of conditions and the following disclaimer. 15 1.1 thorpej * 2. Redistributions in binary form must reproduce the above copyright 16 1.1 thorpej * notice, this list of conditions and the following disclaimer in the 17 1.1 thorpej * documentation and/or other materials provided with the distribution. 18 1.1 thorpej * 19 1.1 thorpej * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND 20 1.1 thorpej * CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, 21 1.1 thorpej * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 22 1.1 thorpej * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 23 1.1 thorpej * IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY 24 1.1 thorpej * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 25 1.1 thorpej * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 26 1.1 thorpej * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 1.1 thorpej * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 28 1.1 thorpej * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 29 1.1 thorpej * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN 30 1.1 thorpej * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 1.1 thorpej */ 32 1.1 thorpej 33 1.1 thorpej #include <sys/cdefs.h> 34 1.1 thorpej #if !defined(lint) 35 1.6 andvar __RCSID("$NetBSD: threadpool.c,v 1.6 2024/02/02 21:52:23 andvar Exp $"); 36 1.1 thorpej #endif /* !lint */ 37 1.1 thorpej 38 1.1 thorpej #include <sys/param.h> 39 1.1 thorpej #include <sys/condvar.h> 40 1.1 thorpej #include <sys/kernel.h> 41 1.1 thorpej #include <sys/kmem.h> 42 1.1 thorpej #include <sys/mutex.h> 43 1.1 thorpej #include <sys/threadpool.h> 44 1.1 thorpej 45 1.1 thorpej #include "kernspace.h" 46 1.1 thorpej 47 1.1 thorpej void 48 1.1 thorpej rumptest_threadpool_unbound_lifecycle(void) 49 1.1 thorpej { 50 1.3 thorpej struct threadpool *pool0, *pool1, *pool2; 51 1.1 thorpej int error; 52 1.1 thorpej 53 1.1 thorpej error = threadpool_get(&pool0, PRI_NONE); 54 1.1 thorpej KASSERT(error == 0); 55 1.1 thorpej 56 1.1 thorpej error = threadpool_get(&pool1, PRI_NONE); 57 1.1 thorpej KASSERT(error == 0); 58 1.1 thorpej 59 1.1 thorpej KASSERT(pool0 == pool1); 60 1.1 thorpej 61 1.1 thorpej error = threadpool_get(&pool2, PRI_KERNEL_RT); 62 1.1 thorpej KASSERT(error == 0); 63 1.1 thorpej 64 1.1 thorpej KASSERT(pool0 != pool2); 65 1.1 thorpej 66 1.1 thorpej threadpool_put(pool0, PRI_NONE); 67 1.1 thorpej threadpool_put(pool1, PRI_NONE); 68 1.1 thorpej threadpool_put(pool2, PRI_KERNEL_RT); 69 1.1 thorpej } 70 1.1 thorpej 71 1.1 thorpej void 72 1.1 thorpej rumptest_threadpool_percpu_lifecycle(void) 73 1.1 thorpej { 74 1.3 thorpej struct threadpool_percpu *pcpu0, *pcpu1, *pcpu2; 75 1.1 thorpej int error; 76 1.1 thorpej 77 1.1 thorpej error = threadpool_percpu_get(&pcpu0, PRI_NONE); 78 1.1 thorpej KASSERT(error == 0); 79 1.1 thorpej 80 1.1 thorpej error = threadpool_percpu_get(&pcpu1, PRI_NONE); 81 1.1 thorpej KASSERT(error == 0); 82 1.1 thorpej 83 1.1 thorpej KASSERT(pcpu0 == pcpu1); 84 1.1 thorpej 85 1.1 thorpej error = threadpool_percpu_get(&pcpu2, PRI_KERNEL_RT); 86 1.1 thorpej KASSERT(error == 0); 87 1.1 thorpej 88 1.1 thorpej KASSERT(pcpu0 != pcpu2); 89 1.1 thorpej 90 1.1 thorpej threadpool_percpu_put(pcpu0, PRI_NONE); 91 1.1 thorpej threadpool_percpu_put(pcpu1, PRI_NONE); 92 1.1 thorpej threadpool_percpu_put(pcpu2, PRI_KERNEL_RT); 93 1.1 thorpej } 94 1.1 thorpej 95 1.1 thorpej struct test_job_data { 96 1.1 thorpej kmutex_t mutex; 97 1.1 thorpej kcondvar_t cond; 98 1.1 thorpej unsigned int count; 99 1.3 thorpej struct threadpool_job job; 100 1.1 thorpej }; 101 1.1 thorpej 102 1.1 thorpej #define FINAL_COUNT 12345 103 1.1 thorpej 104 1.1 thorpej static void 105 1.3 thorpej test_job_func_schedule(struct threadpool_job *job) 106 1.1 thorpej { 107 1.1 thorpej struct test_job_data *data = 108 1.1 thorpej container_of(job, struct test_job_data, job); 109 1.1 thorpej 110 1.1 thorpej mutex_enter(&data->mutex); 111 1.1 thorpej KASSERT(data->count != FINAL_COUNT); 112 1.1 thorpej data->count++; 113 1.1 thorpej cv_broadcast(&data->cond); 114 1.1 thorpej threadpool_job_done(job); 115 1.1 thorpej mutex_exit(&data->mutex); 116 1.1 thorpej } 117 1.1 thorpej 118 1.1 thorpej static void 119 1.3 thorpej test_job_func_cancel(struct threadpool_job *job) 120 1.1 thorpej { 121 1.1 thorpej struct test_job_data *data = 122 1.1 thorpej container_of(job, struct test_job_data, job); 123 1.5 thorpej 124 1.1 thorpej mutex_enter(&data->mutex); 125 1.5 thorpej if (data->count == 0) { 126 1.5 thorpej data->count = 1; 127 1.5 thorpej cv_broadcast(&data->cond); 128 1.5 thorpej } 129 1.1 thorpej while (data->count != FINAL_COUNT - 1) 130 1.1 thorpej cv_wait(&data->cond, &data->mutex); 131 1.1 thorpej data->count = FINAL_COUNT; 132 1.1 thorpej cv_broadcast(&data->cond); 133 1.1 thorpej threadpool_job_done(job); 134 1.1 thorpej mutex_exit(&data->mutex); 135 1.1 thorpej } 136 1.1 thorpej 137 1.1 thorpej static void 138 1.1 thorpej init_test_job_data(struct test_job_data *data, threadpool_job_fn_t fn) 139 1.1 thorpej { 140 1.1 thorpej mutex_init(&data->mutex, MUTEX_DEFAULT, IPL_NONE); 141 1.1 thorpej cv_init(&data->cond, "testjob"); 142 1.1 thorpej threadpool_job_init(&data->job, fn, &data->mutex, "testjob"); 143 1.1 thorpej data->count = 0; 144 1.1 thorpej } 145 1.1 thorpej 146 1.1 thorpej static void 147 1.1 thorpej fini_test_job_data(struct test_job_data *data) 148 1.1 thorpej { 149 1.1 thorpej threadpool_job_destroy(&data->job); 150 1.1 thorpej cv_destroy(&data->cond); 151 1.1 thorpej mutex_destroy(&data->mutex); 152 1.1 thorpej } 153 1.1 thorpej 154 1.1 thorpej void 155 1.1 thorpej rumptest_threadpool_unbound_schedule(void) 156 1.1 thorpej { 157 1.1 thorpej struct test_job_data data; 158 1.3 thorpej struct threadpool *pool; 159 1.1 thorpej int error; 160 1.1 thorpej 161 1.1 thorpej error = threadpool_get(&pool, PRI_NONE); 162 1.1 thorpej KASSERT(error == 0); 163 1.1 thorpej 164 1.1 thorpej init_test_job_data(&data, test_job_func_schedule); 165 1.1 thorpej 166 1.1 thorpej mutex_enter(&data.mutex); 167 1.1 thorpej while (data.count != FINAL_COUNT) { 168 1.1 thorpej threadpool_schedule_job(pool, &data.job); 169 1.1 thorpej error = cv_timedwait(&data.cond, &data.mutex, hz * 2); 170 1.1 thorpej KASSERT(error != EWOULDBLOCK); 171 1.1 thorpej } 172 1.1 thorpej mutex_exit(&data.mutex); 173 1.1 thorpej 174 1.1 thorpej fini_test_job_data(&data); 175 1.1 thorpej 176 1.1 thorpej threadpool_put(pool, PRI_NONE); 177 1.1 thorpej } 178 1.1 thorpej 179 1.1 thorpej void 180 1.1 thorpej rumptest_threadpool_percpu_schedule(void) 181 1.1 thorpej { 182 1.1 thorpej struct test_job_data data; 183 1.3 thorpej struct threadpool_percpu *pcpu; 184 1.3 thorpej struct threadpool *pool; 185 1.1 thorpej int error; 186 1.1 thorpej 187 1.1 thorpej error = threadpool_percpu_get(&pcpu, PRI_NONE); 188 1.1 thorpej KASSERT(error == 0); 189 1.1 thorpej 190 1.1 thorpej pool = threadpool_percpu_ref(pcpu); 191 1.1 thorpej 192 1.1 thorpej init_test_job_data(&data, test_job_func_schedule); 193 1.1 thorpej 194 1.1 thorpej mutex_enter(&data.mutex); 195 1.1 thorpej while (data.count != FINAL_COUNT) { 196 1.1 thorpej threadpool_schedule_job(pool, &data.job); 197 1.1 thorpej error = cv_timedwait(&data.cond, &data.mutex, hz * 2); 198 1.1 thorpej KASSERT(error != EWOULDBLOCK); 199 1.1 thorpej } 200 1.1 thorpej mutex_exit(&data.mutex); 201 1.1 thorpej 202 1.1 thorpej fini_test_job_data(&data); 203 1.1 thorpej 204 1.1 thorpej threadpool_percpu_put(pcpu, PRI_NONE); 205 1.1 thorpej } 206 1.1 thorpej 207 1.1 thorpej void 208 1.1 thorpej rumptest_threadpool_job_cancel(void) 209 1.1 thorpej { 210 1.1 thorpej struct test_job_data data; 211 1.3 thorpej struct threadpool *pool; 212 1.1 thorpej int error; 213 1.1 thorpej bool rv; 214 1.1 thorpej 215 1.1 thorpej error = threadpool_get(&pool, PRI_NONE); 216 1.1 thorpej KASSERT(error == 0); 217 1.1 thorpej 218 1.1 thorpej init_test_job_data(&data, test_job_func_cancel); 219 1.1 thorpej 220 1.1 thorpej mutex_enter(&data.mutex); 221 1.1 thorpej threadpool_schedule_job(pool, &data.job); 222 1.1 thorpej while (data.count == 0) 223 1.1 thorpej cv_wait(&data.cond, &data.mutex); 224 1.1 thorpej KASSERT(data.count == 1); 225 1.1 thorpej 226 1.6 andvar /* Job is already running (and is not finished); this should fail. */ 227 1.1 thorpej rv = threadpool_cancel_job_async(pool, &data.job); 228 1.1 thorpej KASSERT(rv == false); 229 1.1 thorpej 230 1.1 thorpej data.count = FINAL_COUNT - 1; 231 1.1 thorpej cv_broadcast(&data.cond); 232 1.1 thorpej 233 1.1 thorpej /* Now wait for the job to finish. */ 234 1.1 thorpej threadpool_cancel_job(pool, &data.job); 235 1.1 thorpej KASSERT(data.count == FINAL_COUNT); 236 1.2 thorpej mutex_exit(&data.mutex); 237 1.2 thorpej 238 1.2 thorpej fini_test_job_data(&data); 239 1.2 thorpej 240 1.2 thorpej threadpool_put(pool, PRI_NONE); 241 1.1 thorpej } 242 1.4 thorpej 243 1.4 thorpej void 244 1.4 thorpej rumptest_threadpool_job_cancelthrash(void) 245 1.4 thorpej { 246 1.4 thorpej struct test_job_data data; 247 1.4 thorpej struct threadpool *pool; 248 1.4 thorpej int i, error; 249 1.4 thorpej 250 1.4 thorpej error = threadpool_get(&pool, PRI_NONE); 251 1.4 thorpej KASSERT(error == 0); 252 1.4 thorpej 253 1.4 thorpej init_test_job_data(&data, test_job_func_cancel); 254 1.4 thorpej 255 1.4 thorpej mutex_enter(&data.mutex); 256 1.4 thorpej for (i = 0; i < 10000; i++) { 257 1.4 thorpej threadpool_schedule_job(pool, &data.job); 258 1.4 thorpej if ((i % 3) == 0) { 259 1.4 thorpej mutex_exit(&data.mutex); 260 1.4 thorpej mutex_enter(&data.mutex); 261 1.4 thorpej } 262 1.5 thorpej /* 263 1.5 thorpej * If the job managed to start, ensure that its exit 264 1.5 thorpej * condition is met so that we don't wait forever 265 1.5 thorpej * for the job to finish. 266 1.5 thorpej */ 267 1.5 thorpej data.count = FINAL_COUNT - 1; 268 1.5 thorpej cv_broadcast(&data.cond); 269 1.5 thorpej 270 1.4 thorpej threadpool_cancel_job(pool, &data.job); 271 1.5 thorpej 272 1.5 thorpej /* 273 1.5 thorpej * After cancellation, either the job didn't start 274 1.5 thorpej * (data.count == FINAL_COUNT - 1, per above) or 275 1.5 thorpej * it finished (data.count == FINAL_COUNT). 276 1.5 thorpej */ 277 1.5 thorpej KASSERT(data.count == (FINAL_COUNT - 1) || 278 1.5 thorpej data.count == FINAL_COUNT); 279 1.5 thorpej 280 1.5 thorpej /* Reset for the loop. */ 281 1.5 thorpej data.count = 0; 282 1.4 thorpej } 283 1.4 thorpej mutex_exit(&data.mutex); 284 1.4 thorpej 285 1.4 thorpej fini_test_job_data(&data); 286 1.4 thorpej 287 1.4 thorpej threadpool_put(pool, PRI_NONE); 288 1.4 thorpej } 289