Home | History | Annotate | Line # | Download | only in kern
subr_workqueue.c revision 1.45
      1 /*	$NetBSD: subr_workqueue.c,v 1.45 2023/08/09 08:23:45 riastradh Exp $	*/
      2 
      3 /*-
      4  * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi,
      5  * All rights reserved.
      6  *
      7  * Redistribution and use in source and binary forms, with or without
      8  * modification, are permitted provided that the following conditions
      9  * are met:
     10  * 1. Redistributions of source code must retain the above copyright
     11  *    notice, this list of conditions and the following disclaimer.
     12  * 2. Redistributions in binary form must reproduce the above copyright
     13  *    notice, this list of conditions and the following disclaimer in the
     14  *    documentation and/or other materials provided with the distribution.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26  * SUCH DAMAGE.
     27  */
     28 
     29 #include <sys/cdefs.h>
     30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.45 2023/08/09 08:23:45 riastradh Exp $");
     31 
     32 #include <sys/param.h>
     33 #include <sys/cpu.h>
     34 #include <sys/systm.h>
     35 #include <sys/kthread.h>
     36 #include <sys/kmem.h>
     37 #include <sys/proc.h>
     38 #include <sys/workqueue.h>
     39 #include <sys/mutex.h>
     40 #include <sys/condvar.h>
     41 #include <sys/sdt.h>
     42 #include <sys/queue.h>
     43 
     44 typedef struct work_impl {
     45 	SIMPLEQ_ENTRY(work_impl) wk_entry;
     46 } work_impl_t;
     47 
     48 SIMPLEQ_HEAD(workqhead, work_impl);
     49 
     50 struct workqueue_queue {
     51 	kmutex_t q_mutex;
     52 	kcondvar_t q_cv;
     53 	struct workqhead q_queue_pending;
     54 	uint64_t q_gen;
     55 	lwp_t *q_worker;
     56 };
     57 
     58 struct workqueue {
     59 	void (*wq_func)(struct work *, void *);
     60 	void *wq_arg;
     61 	int wq_flags;
     62 
     63 	char wq_name[MAXCOMLEN];
     64 	pri_t wq_prio;
     65 	void *wq_ptr;
     66 };
     67 
     68 #define	WQ_SIZE		(roundup2(sizeof(struct workqueue), coherency_unit))
     69 #define	WQ_QUEUE_SIZE	(roundup2(sizeof(struct workqueue_queue), coherency_unit))
     70 
     71 #define	POISON	0xaabbccdd
     72 
     73 SDT_PROBE_DEFINE7(sdt, kernel, workqueue, create,
     74     "struct workqueue *"/*wq*/,
     75     "const char *"/*name*/,
     76     "void (*)(struct work *, void *)"/*func*/,
     77     "void *"/*arg*/,
     78     "pri_t"/*prio*/,
     79     "int"/*ipl*/,
     80     "int"/*flags*/);
     81 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, destroy,
     82     "struct workqueue *"/*wq*/);
     83 
     84 SDT_PROBE_DEFINE3(sdt, kernel, workqueue, enqueue,
     85     "struct workqueue *"/*wq*/,
     86     "struct work *"/*wk*/,
     87     "struct cpu_info *"/*ci*/);
     88 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, entry,
     89     "struct workqueue *"/*wq*/,
     90     "struct work *"/*wk*/,
     91     "void (*)(struct work *, void *)"/*func*/,
     92     "void *"/*arg*/);
     93 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, return,
     94     "struct workqueue *"/*wq*/,
     95     "struct work *"/*wk*/,
     96     "void (*)(struct work *, void *)"/*func*/,
     97     "void *"/*arg*/);
     98 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__start,
     99     "struct workqueue *"/*wq*/,
    100     "struct work *"/*wk*/);
    101 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__self,
    102     "struct workqueue *"/*wq*/,
    103     "struct work *"/*wk*/);
    104 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__hit,
    105     "struct workqueue *"/*wq*/,
    106     "struct work *"/*wk*/);
    107 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__done,
    108     "struct workqueue *"/*wq*/,
    109     "struct work *"/*wk*/);
    110 
    111 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__start,
    112     "struct workqueue *"/*wq*/);
    113 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__done,
    114     "struct workqueue *"/*wq*/);
    115 
    116 static size_t
    117 workqueue_size(int flags)
    118 {
    119 
    120 	return WQ_SIZE
    121 	    + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE
    122 	    + coherency_unit;
    123 }
    124 
    125 static struct workqueue_queue *
    126 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci)
    127 {
    128 	u_int idx = 0;
    129 
    130 	if (wq->wq_flags & WQ_PERCPU) {
    131 		idx = ci ? cpu_index(ci) : cpu_index(curcpu());
    132 	}
    133 
    134 	return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE));
    135 }
    136 
    137 static void
    138 workqueue_runlist(struct workqueue *wq, struct workqhead *list)
    139 {
    140 	work_impl_t *wk;
    141 	work_impl_t *next;
    142 
    143 	for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) {
    144 		next = SIMPLEQ_NEXT(wk, wk_entry);
    145 		SDT_PROBE4(sdt, kernel, workqueue, entry,
    146 		    wq, wk, wq->wq_func, wq->wq_arg);
    147 		(*wq->wq_func)((void *)wk, wq->wq_arg);
    148 		SDT_PROBE4(sdt, kernel, workqueue, return,
    149 		    wq, wk, wq->wq_func, wq->wq_arg);
    150 	}
    151 }
    152 
    153 static void
    154 workqueue_worker(void *cookie)
    155 {
    156 	struct workqueue *wq = cookie;
    157 	struct workqueue_queue *q;
    158 	int s;
    159 
    160 	/* find the workqueue of this kthread */
    161 	q = workqueue_queue_lookup(wq, curlwp->l_cpu);
    162 
    163 	if (wq->wq_flags & WQ_FPU)
    164 		s = kthread_fpu_enter();
    165 	mutex_enter(&q->q_mutex);
    166 	for (;;) {
    167 		struct workqhead tmp;
    168 
    169 		SIMPLEQ_INIT(&tmp);
    170 
    171 		while (SIMPLEQ_EMPTY(&q->q_queue_pending))
    172 			cv_wait(&q->q_cv, &q->q_mutex);
    173 		SIMPLEQ_CONCAT(&tmp, &q->q_queue_pending);
    174 		SIMPLEQ_INIT(&q->q_queue_pending);
    175 
    176 		/*
    177 		 * Mark the queue as actively running a batch of work
    178 		 * by setting the generation number odd.
    179 		 */
    180 		q->q_gen |= 1;
    181 		mutex_exit(&q->q_mutex);
    182 
    183 		workqueue_runlist(wq, &tmp);
    184 
    185 		/*
    186 		 * Notify workqueue_wait that we have completed a batch
    187 		 * of work by incrementing the generation number.
    188 		 */
    189 		mutex_enter(&q->q_mutex);
    190 		KASSERTMSG(q->q_gen & 1, "q=%p gen=%"PRIu64, q, q->q_gen);
    191 		q->q_gen++;
    192 		cv_broadcast(&q->q_cv);
    193 	}
    194 	mutex_exit(&q->q_mutex);
    195 	if (wq->wq_flags & WQ_FPU)
    196 		kthread_fpu_exit(s);
    197 }
    198 
    199 static void
    200 workqueue_init(struct workqueue *wq, const char *name,
    201     void (*callback_func)(struct work *, void *), void *callback_arg,
    202     pri_t prio, int ipl)
    203 {
    204 
    205 	KASSERT(sizeof(wq->wq_name) > strlen(name));
    206 	strncpy(wq->wq_name, name, sizeof(wq->wq_name));
    207 
    208 	wq->wq_prio = prio;
    209 	wq->wq_func = callback_func;
    210 	wq->wq_arg = callback_arg;
    211 }
    212 
    213 static int
    214 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q,
    215     int ipl, struct cpu_info *ci)
    216 {
    217 	int error, ktf;
    218 
    219 	KASSERT(q->q_worker == NULL);
    220 
    221 	mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
    222 	cv_init(&q->q_cv, wq->wq_name);
    223 	SIMPLEQ_INIT(&q->q_queue_pending);
    224 	q->q_gen = 0;
    225 	ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
    226 	if (wq->wq_prio < PRI_KERNEL)
    227 		ktf |= KTHREAD_TS;
    228 	if (ci) {
    229 		error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
    230 		    wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index);
    231 	} else {
    232 		error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
    233 		    wq, &q->q_worker, "%s", wq->wq_name);
    234 	}
    235 	if (error != 0) {
    236 		mutex_destroy(&q->q_mutex);
    237 		cv_destroy(&q->q_cv);
    238 		KASSERT(q->q_worker == NULL);
    239 	}
    240 	return error;
    241 }
    242 
    243 struct workqueue_exitargs {
    244 	work_impl_t wqe_wk;
    245 	struct workqueue_queue *wqe_q;
    246 };
    247 
    248 static void
    249 workqueue_exit(struct work *wk, void *arg)
    250 {
    251 	struct workqueue_exitargs *wqe = (void *)wk;
    252 	struct workqueue_queue *q = wqe->wqe_q;
    253 
    254 	/*
    255 	 * only competition at this point is workqueue_finiqueue.
    256 	 */
    257 
    258 	KASSERT(q->q_worker == curlwp);
    259 	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
    260 	mutex_enter(&q->q_mutex);
    261 	q->q_worker = NULL;
    262 	cv_broadcast(&q->q_cv);
    263 	mutex_exit(&q->q_mutex);
    264 	kthread_exit(0);
    265 }
    266 
    267 static void
    268 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q)
    269 {
    270 	struct workqueue_exitargs wqe;
    271 
    272 	KASSERT(wq->wq_func == workqueue_exit);
    273 
    274 	wqe.wqe_q = q;
    275 	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
    276 	KASSERT(q->q_worker != NULL);
    277 	mutex_enter(&q->q_mutex);
    278 	SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry);
    279 	cv_broadcast(&q->q_cv);
    280 	while (q->q_worker != NULL) {
    281 		cv_wait(&q->q_cv, &q->q_mutex);
    282 	}
    283 	mutex_exit(&q->q_mutex);
    284 	mutex_destroy(&q->q_mutex);
    285 	cv_destroy(&q->q_cv);
    286 }
    287 
    288 /* --- */
    289 
    290 int
    291 workqueue_create(struct workqueue **wqp, const char *name,
    292     void (*callback_func)(struct work *, void *), void *callback_arg,
    293     pri_t prio, int ipl, int flags)
    294 {
    295 	struct workqueue *wq;
    296 	struct workqueue_queue *q;
    297 	void *ptr;
    298 	int error = 0;
    299 
    300 	CTASSERT(sizeof(work_impl_t) <= sizeof(struct work));
    301 
    302 	ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP);
    303 	wq = (void *)roundup2((uintptr_t)ptr, coherency_unit);
    304 	wq->wq_ptr = ptr;
    305 	wq->wq_flags = flags;
    306 
    307 	workqueue_init(wq, name, callback_func, callback_arg, prio, ipl);
    308 
    309 	if (flags & WQ_PERCPU) {
    310 		struct cpu_info *ci;
    311 		CPU_INFO_ITERATOR cii;
    312 
    313 		/* create the work-queue for each CPU */
    314 		for (CPU_INFO_FOREACH(cii, ci)) {
    315 			q = workqueue_queue_lookup(wq, ci);
    316 			error = workqueue_initqueue(wq, q, ipl, ci);
    317 			if (error) {
    318 				break;
    319 			}
    320 		}
    321 	} else {
    322 		/* initialize a work-queue */
    323 		q = workqueue_queue_lookup(wq, NULL);
    324 		error = workqueue_initqueue(wq, q, ipl, NULL);
    325 	}
    326 
    327 	if (error != 0) {
    328 		workqueue_destroy(wq);
    329 	} else {
    330 		*wqp = wq;
    331 	}
    332 
    333 	return error;
    334 }
    335 
    336 static bool
    337 workqueue_q_wait(struct workqueue *wq, struct workqueue_queue *q,
    338     work_impl_t *wk_target)
    339 {
    340 	work_impl_t *wk;
    341 	bool found = false;
    342 	uint64_t gen;
    343 
    344 	mutex_enter(&q->q_mutex);
    345 
    346 	/*
    347 	 * Avoid a deadlock scenario.  We can't guarantee that
    348 	 * wk_target has completed at this point, but we can't wait for
    349 	 * it either, so do nothing.
    350 	 *
    351 	 * XXX Are there use-cases that require this semantics?
    352 	 */
    353 	if (q->q_worker == curlwp) {
    354 		SDT_PROBE2(sdt, kernel, workqueue, wait__self,  wq, wk_target);
    355 		goto out;
    356 	}
    357 
    358 	/*
    359 	 * Wait until the target is no longer pending.  If we find it
    360 	 * on this queue, the caller can stop looking in other queues.
    361 	 * If we don't find it in this queue, however, we can't skip
    362 	 * waiting -- it may be hidden in the running queue which we
    363 	 * have no access to.
    364 	 */
    365     again:
    366 	SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) {
    367 		if (wk == wk_target) {
    368 			SDT_PROBE2(sdt, kernel, workqueue, wait__hit,  wq, wk);
    369 			found = true;
    370 			cv_wait(&q->q_cv, &q->q_mutex);
    371 			goto again;
    372 		}
    373 	}
    374 
    375 	/*
    376 	 * The target may be in the batch of work currently running,
    377 	 * but we can't touch that queue.  So if there's anything
    378 	 * running, wait until the generation changes.
    379 	 */
    380 	gen = q->q_gen;
    381 	if (gen & 1) {
    382 		do
    383 			cv_wait(&q->q_cv, &q->q_mutex);
    384 		while (gen == q->q_gen);
    385 	}
    386 
    387     out:
    388 	mutex_exit(&q->q_mutex);
    389 
    390 	return found;
    391 }
    392 
    393 /*
    394  * Wait for a specified work to finish.  The caller must ensure that no new
    395  * work will be enqueued before calling workqueue_wait.  Note that if the
    396  * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue
    397  * other than the waiting queue.
    398  */
    399 void
    400 workqueue_wait(struct workqueue *wq, struct work *wk)
    401 {
    402 	struct workqueue_queue *q;
    403 	bool found;
    404 
    405 	ASSERT_SLEEPABLE();
    406 
    407 	SDT_PROBE2(sdt, kernel, workqueue, wait__start,  wq, wk);
    408 	if (ISSET(wq->wq_flags, WQ_PERCPU)) {
    409 		struct cpu_info *ci;
    410 		CPU_INFO_ITERATOR cii;
    411 		for (CPU_INFO_FOREACH(cii, ci)) {
    412 			q = workqueue_queue_lookup(wq, ci);
    413 			found = workqueue_q_wait(wq, q, (work_impl_t *)wk);
    414 			if (found)
    415 				break;
    416 		}
    417 	} else {
    418 		q = workqueue_queue_lookup(wq, NULL);
    419 		(void)workqueue_q_wait(wq, q, (work_impl_t *)wk);
    420 	}
    421 	SDT_PROBE2(sdt, kernel, workqueue, wait__done,  wq, wk);
    422 }
    423 
    424 void
    425 workqueue_destroy(struct workqueue *wq)
    426 {
    427 	struct workqueue_queue *q;
    428 	struct cpu_info *ci;
    429 	CPU_INFO_ITERATOR cii;
    430 
    431 	ASSERT_SLEEPABLE();
    432 
    433 	SDT_PROBE1(sdt, kernel, workqueue, exit__start,  wq);
    434 	wq->wq_func = workqueue_exit;
    435 	for (CPU_INFO_FOREACH(cii, ci)) {
    436 		q = workqueue_queue_lookup(wq, ci);
    437 		if (q->q_worker != NULL) {
    438 			workqueue_finiqueue(wq, q);
    439 		}
    440 	}
    441 	SDT_PROBE1(sdt, kernel, workqueue, exit__done,  wq);
    442 	kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags));
    443 }
    444 
    445 #ifdef DEBUG
    446 static void
    447 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk)
    448 {
    449 	work_impl_t *_wk;
    450 
    451 	SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) {
    452 		if (_wk == wk)
    453 			panic("%s: tried to enqueue a queued work", __func__);
    454 	}
    455 }
    456 #endif
    457 
    458 void
    459 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci)
    460 {
    461 	struct workqueue_queue *q;
    462 	work_impl_t *wk = (void *)wk0;
    463 
    464 	SDT_PROBE3(sdt, kernel, workqueue, enqueue,  wq, wk0, ci);
    465 
    466 	KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL);
    467 	q = workqueue_queue_lookup(wq, ci);
    468 
    469 	mutex_enter(&q->q_mutex);
    470 #ifdef DEBUG
    471 	workqueue_check_duplication(q, wk);
    472 #endif
    473 	SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry);
    474 	cv_broadcast(&q->q_cv);
    475 	mutex_exit(&q->q_mutex);
    476 }
    477