Home | History | Annotate | Line # | Download | only in kernspace
      1 /*	$NetBSD: threadpool.c,v 1.6 2024/02/02 21:52:23 andvar Exp $	*/
      2 
      3 /*-
      4  * Copyright (c) 2018 The NetBSD Foundation, Inc.
      5  * All rights reserved.
      6  *
      7  * This code is derived from software contributed to The NetBSD Foundation
      8  * by Jason R. Thorpe.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND
     20  * CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
     21  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     22  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     23  * IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY
     24  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     25  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
     26  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
     28  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
     30  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31  */
     32 
     33 #include <sys/cdefs.h>
     34 #if !defined(lint)
     35 __RCSID("$NetBSD: threadpool.c,v 1.6 2024/02/02 21:52:23 andvar Exp $");
     36 #endif /* !lint */
     37 
     38 #include <sys/param.h>
     39 #include <sys/condvar.h>
     40 #include <sys/kernel.h>
     41 #include <sys/kmem.h>
     42 #include <sys/mutex.h>
     43 #include <sys/threadpool.h>
     44 
     45 #include "kernspace.h"
     46 
     47 void
     48 rumptest_threadpool_unbound_lifecycle(void)
     49 {
     50 	struct threadpool *pool0, *pool1, *pool2;
     51 	int error;
     52 
     53 	error = threadpool_get(&pool0, PRI_NONE);
     54 	KASSERT(error == 0);
     55 
     56 	error = threadpool_get(&pool1, PRI_NONE);
     57 	KASSERT(error == 0);
     58 
     59 	KASSERT(pool0 == pool1);
     60 
     61 	error = threadpool_get(&pool2, PRI_KERNEL_RT);
     62 	KASSERT(error == 0);
     63 
     64 	KASSERT(pool0 != pool2);
     65 
     66 	threadpool_put(pool0, PRI_NONE);
     67 	threadpool_put(pool1, PRI_NONE);
     68 	threadpool_put(pool2, PRI_KERNEL_RT);
     69 }
     70 
     71 void
     72 rumptest_threadpool_percpu_lifecycle(void)
     73 {
     74 	struct threadpool_percpu *pcpu0, *pcpu1, *pcpu2;
     75 	int error;
     76 
     77 	error = threadpool_percpu_get(&pcpu0, PRI_NONE);
     78 	KASSERT(error == 0);
     79 
     80 	error = threadpool_percpu_get(&pcpu1, PRI_NONE);
     81 	KASSERT(error == 0);
     82 
     83 	KASSERT(pcpu0 == pcpu1);
     84 
     85 	error = threadpool_percpu_get(&pcpu2, PRI_KERNEL_RT);
     86 	KASSERT(error == 0);
     87 
     88 	KASSERT(pcpu0 != pcpu2);
     89 
     90 	threadpool_percpu_put(pcpu0, PRI_NONE);
     91 	threadpool_percpu_put(pcpu1, PRI_NONE);
     92 	threadpool_percpu_put(pcpu2, PRI_KERNEL_RT);
     93 }
     94 
     95 struct test_job_data {
     96 	kmutex_t mutex;
     97 	kcondvar_t cond;
     98 	unsigned int count;
     99 	struct threadpool_job job;
    100 };
    101 
    102 #define	FINAL_COUNT	12345
    103 
    104 static void
    105 test_job_func_schedule(struct threadpool_job *job)
    106 {
    107 	struct test_job_data *data =
    108 	    container_of(job, struct test_job_data, job);
    109 
    110 	mutex_enter(&data->mutex);
    111 	KASSERT(data->count != FINAL_COUNT);
    112 	data->count++;
    113 	cv_broadcast(&data->cond);
    114 	threadpool_job_done(job);
    115 	mutex_exit(&data->mutex);
    116 }
    117 
    118 static void
    119 test_job_func_cancel(struct threadpool_job *job)
    120 {
    121 	struct test_job_data *data =
    122 	    container_of(job, struct test_job_data, job);
    123 
    124 	mutex_enter(&data->mutex);
    125 	if (data->count == 0) {
    126 		data->count = 1;
    127 		cv_broadcast(&data->cond);
    128 	}
    129 	while (data->count != FINAL_COUNT - 1)
    130 		cv_wait(&data->cond, &data->mutex);
    131 	data->count = FINAL_COUNT;
    132 	cv_broadcast(&data->cond);
    133 	threadpool_job_done(job);
    134 	mutex_exit(&data->mutex);
    135 }
    136 
    137 static void
    138 init_test_job_data(struct test_job_data *data, threadpool_job_fn_t fn)
    139 {
    140 	mutex_init(&data->mutex, MUTEX_DEFAULT, IPL_NONE);
    141 	cv_init(&data->cond, "testjob");
    142 	threadpool_job_init(&data->job, fn, &data->mutex, "testjob");
    143 	data->count = 0;
    144 }
    145 
    146 static void
    147 fini_test_job_data(struct test_job_data *data)
    148 {
    149 	threadpool_job_destroy(&data->job);
    150 	cv_destroy(&data->cond);
    151 	mutex_destroy(&data->mutex);
    152 }
    153 
    154 void
    155 rumptest_threadpool_unbound_schedule(void)
    156 {
    157 	struct test_job_data data;
    158 	struct threadpool *pool;
    159 	int error;
    160 
    161 	error = threadpool_get(&pool, PRI_NONE);
    162 	KASSERT(error == 0);
    163 
    164 	init_test_job_data(&data, test_job_func_schedule);
    165 
    166 	mutex_enter(&data.mutex);
    167 	while (data.count != FINAL_COUNT) {
    168 		threadpool_schedule_job(pool, &data.job);
    169 		error = cv_timedwait(&data.cond, &data.mutex, hz * 2);
    170 		KASSERT(error != EWOULDBLOCK);
    171 	}
    172 	mutex_exit(&data.mutex);
    173 
    174 	fini_test_job_data(&data);
    175 
    176 	threadpool_put(pool, PRI_NONE);
    177 }
    178 
    179 void
    180 rumptest_threadpool_percpu_schedule(void)
    181 {
    182 	struct test_job_data data;
    183 	struct threadpool_percpu *pcpu;
    184 	struct threadpool *pool;
    185 	int error;
    186 
    187 	error = threadpool_percpu_get(&pcpu, PRI_NONE);
    188 	KASSERT(error == 0);
    189 
    190 	pool = threadpool_percpu_ref(pcpu);
    191 
    192 	init_test_job_data(&data, test_job_func_schedule);
    193 
    194 	mutex_enter(&data.mutex);
    195 	while (data.count != FINAL_COUNT) {
    196 		threadpool_schedule_job(pool, &data.job);
    197 		error = cv_timedwait(&data.cond, &data.mutex, hz * 2);
    198 		KASSERT(error != EWOULDBLOCK);
    199 	}
    200 	mutex_exit(&data.mutex);
    201 
    202 	fini_test_job_data(&data);
    203 
    204 	threadpool_percpu_put(pcpu, PRI_NONE);
    205 }
    206 
    207 void
    208 rumptest_threadpool_job_cancel(void)
    209 {
    210 	struct test_job_data data;
    211 	struct threadpool *pool;
    212 	int error;
    213 	bool rv;
    214 
    215 	error = threadpool_get(&pool, PRI_NONE);
    216 	KASSERT(error == 0);
    217 
    218 	init_test_job_data(&data, test_job_func_cancel);
    219 
    220 	mutex_enter(&data.mutex);
    221 	threadpool_schedule_job(pool, &data.job);
    222 	while (data.count == 0)
    223 		cv_wait(&data.cond, &data.mutex);
    224 	KASSERT(data.count == 1);
    225 
    226 	/* Job is already running (and is not finished); this should fail. */
    227 	rv = threadpool_cancel_job_async(pool, &data.job);
    228 	KASSERT(rv == false);
    229 
    230 	data.count = FINAL_COUNT - 1;
    231 	cv_broadcast(&data.cond);
    232 
    233 	/* Now wait for the job to finish. */
    234 	threadpool_cancel_job(pool, &data.job);
    235 	KASSERT(data.count == FINAL_COUNT);
    236 	mutex_exit(&data.mutex);
    237 
    238 	fini_test_job_data(&data);
    239 
    240 	threadpool_put(pool, PRI_NONE);
    241 }
    242 
    243 void
    244 rumptest_threadpool_job_cancelthrash(void)
    245 {
    246 	struct test_job_data data;
    247 	struct threadpool *pool;
    248 	int i, error;
    249 
    250 	error = threadpool_get(&pool, PRI_NONE);
    251 	KASSERT(error == 0);
    252 
    253 	init_test_job_data(&data, test_job_func_cancel);
    254 
    255 	mutex_enter(&data.mutex);
    256 	for (i = 0; i < 10000; i++) {
    257 		threadpool_schedule_job(pool, &data.job);
    258 		if ((i % 3) == 0) {
    259 			mutex_exit(&data.mutex);
    260 			mutex_enter(&data.mutex);
    261 		}
    262 		/*
    263 		 * If the job managed to start, ensure that its exit
    264 		 * condition is met so that we don't wait forever
    265 		 * for the job to finish.
    266 		 */
    267 		data.count = FINAL_COUNT - 1;
    268 		cv_broadcast(&data.cond);
    269 
    270 		threadpool_cancel_job(pool, &data.job);
    271 
    272 		/*
    273 		 * After cancellation, either the job didn't start
    274 		 * (data.count == FINAL_COUNT - 1, per above) or
    275 		 * it finished (data.count == FINAL_COUNT).
    276 		 */
    277 		KASSERT(data.count == (FINAL_COUNT - 1) ||
    278 		    data.count == FINAL_COUNT);
    279 
    280 		/* Reset for the loop. */
    281 		data.count = 0;
    282 	}
    283 	mutex_exit(&data.mutex);
    284 
    285 	fini_test_job_data(&data);
    286 
    287 	threadpool_put(pool, PRI_NONE);
    288 }
    289