Home | History | Annotate | Line # | Download | only in kern
      1 /*	$NetBSD: taskq.c,v 1.11 2019/08/20 08:12:50 hannken Exp $	*/
      2 
      3 /*-
      4  * Copyright (c) 2019 The NetBSD Foundation, Inc.
      5  * All rights reserved.
      6  *
      7  * This code is derived from software contributed to The NetBSD Foundation
      8  * by Juergen Hannken-Illjes.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
     20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
     23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29  * POSSIBILITY OF SUCH DAMAGE.
     30  */
     31 
     32 #include <sys/types.h>
     33 #include <sys/param.h>
     34 #include <sys/kcondvar.h>
     35 #include <sys/kernel.h>
     36 #include <sys/kmem.h>
     37 #include <sys/mutex.h>
     38 #include <sys/proc.h>
     39 #include <sys/threadpool.h>
     40 
     41 #include <sys/taskq.h>
     42 
     43 struct taskq_executor {
     44 	struct threadpool_job te_job;	/* Threadpool job serving the queue. */
     45 	taskq_t *te_self;		/* Backpointer to the queue. */
     46 	unsigned te_running:1;		/* True if the job is running. */
     47 };
     48 
     49 struct taskq {
     50 	int tq_nthreads;		/* # of threads serving queue. */
     51 	pri_t tq_pri;			/* Scheduling priority. */
     52 	uint_t tq_flags;		/* Saved flags from taskq_create. */
     53 	int tq_active;			/* # of tasks (queued or running). */
     54 	int tq_running;			/* # of jobs currently running. */
     55 	int tq_waiting;			/* # of jobs currently idle. */
     56 	unsigned tq_destroyed:1;	/* True if queue gets destroyed. */
     57 	kmutex_t tq_lock;		/* Queue and job lock. */
     58 	kcondvar_t tq_cv;		/* Queue condvar. */
     59 	struct taskq_executor *tq_executor; /* Array of jobs. */
     60 	struct threadpool *tq_threadpool; /* Pool backing the jobs. */
     61 	SIMPLEQ_HEAD(, taskq_ent) tq_list; /* Queue of tasks waiting. */
     62 };
     63 
     64 taskq_t *system_taskq;			/* General purpose task queue. */
     65 
     66 static specificdata_key_t taskq_lwp_key; /* Null or taskq this thread runs. */
     67 
     68 /*
     69  * Threadpool job to service tasks from task queue.
     70  * Runs until the task queue gets destroyed or the queue is empty for 10 secs.
     71  */
     72 static void
     73 task_executor(struct threadpool_job *job)
     74 {
     75 	struct taskq_executor *state = (struct taskq_executor *)job;
     76 	taskq_t *tq = state->te_self;
     77 	taskq_ent_t *tqe;
     78 	bool is_dynamic;
     79 	int error;
     80 
     81 	lwp_setspecific(taskq_lwp_key, tq);
     82 
     83 	mutex_enter(&tq->tq_lock);
     84 	while (!tq->tq_destroyed) {
     85 		if (SIMPLEQ_EMPTY(&tq->tq_list)) {
     86 			if (ISSET(tq->tq_flags, TASKQ_DYNAMIC))
     87 				break;
     88 			tq->tq_waiting++;
     89 			error = cv_timedwait(&tq->tq_cv, &tq->tq_lock,
     90 			    mstohz(10000));
     91 			tq->tq_waiting--;
     92 			if (SIMPLEQ_EMPTY(&tq->tq_list)) {
     93 				if (error)
     94 					break;
     95 				continue;
     96 			}
     97 		}
     98 		tqe = SIMPLEQ_FIRST(&tq->tq_list);
     99 		KASSERT(tqe != NULL);
    100 		SIMPLEQ_REMOVE_HEAD(&tq->tq_list, tqent_list);
    101 		is_dynamic = tqe->tqent_dynamic;
    102 		tqe->tqent_queued = 0;
    103 		mutex_exit(&tq->tq_lock);
    104 
    105 		(*tqe->tqent_func)(tqe->tqent_arg);
    106 
    107 		mutex_enter(&tq->tq_lock);
    108 		if (is_dynamic)
    109 			kmem_free(tqe, sizeof(*tqe));
    110 		tq->tq_active--;
    111 	}
    112 	state->te_running = 0;
    113 	tq->tq_running--;
    114 	threadpool_job_done(job);
    115 	mutex_exit(&tq->tq_lock);
    116 
    117 	lwp_setspecific(taskq_lwp_key, NULL);
    118 }
    119 
    120 void
    121 taskq_init(void)
    122 {
    123 
    124 	lwp_specific_key_create(&taskq_lwp_key, NULL);
    125 	system_taskq = taskq_create("system_taskq", ncpu * 4, PRI_KERNEL,
    126 	    4, 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
    127 	KASSERT(system_taskq != NULL);
    128 }
    129 
    130 void
    131 taskq_fini(void)
    132 {
    133 
    134 	taskq_destroy(system_taskq);
    135 	lwp_specific_key_delete(taskq_lwp_key);
    136 }
    137 
    138 /*
    139  * Dispatch a task entry creating executors as neeeded.
    140  */
    141 static void
    142 taskq_dispatch_common(taskq_t *tq, taskq_ent_t *tqe, uint_t flags)
    143 {
    144 	int i;
    145 
    146 	KASSERT(mutex_owned(&tq->tq_lock));
    147 
    148 	if (ISSET(flags, TQ_FRONT))
    149 		SIMPLEQ_INSERT_HEAD(&tq->tq_list, tqe, tqent_list);
    150 	else
    151 		SIMPLEQ_INSERT_TAIL(&tq->tq_list, tqe, tqent_list);
    152 	tqe->tqent_queued = 1;
    153 	tq->tq_active++;
    154 	if (tq->tq_waiting) {
    155 		cv_signal(&tq->tq_cv);
    156 		mutex_exit(&tq->tq_lock);
    157 		return;
    158 	}
    159 	if (tq->tq_running < tq->tq_nthreads) {
    160 		for (i = 0; i < tq->tq_nthreads; i++) {
    161 			if (!tq->tq_executor[i].te_running) {
    162 				tq->tq_executor[i].te_running = 1;
    163 				tq->tq_running++;
    164 				threadpool_schedule_job(tq->tq_threadpool,
    165 				    &tq->tq_executor[i].te_job);
    166 				break;
    167 			}
    168 		}
    169 	}
    170 	mutex_exit(&tq->tq_lock);
    171 }
    172 
    173 /*
    174  * Allocate and dispatch a task entry.
    175  */
    176 taskqid_t
    177 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
    178 {
    179 	taskq_ent_t *tqe;
    180 
    181 	KASSERT(!ISSET(flags, ~(TQ_SLEEP | TQ_NOSLEEP | TQ_NOQUEUE)));
    182 	KASSERT(ISSET(tq->tq_flags, TASKQ_DYNAMIC) ||
    183 	    !ISSET(flags, TQ_NOQUEUE));
    184 
    185 	if (ISSET(flags, (TQ_SLEEP | TQ_NOSLEEP)) == TQ_NOSLEEP)
    186 		tqe = kmem_alloc(sizeof(*tqe), KM_NOSLEEP);
    187 	else
    188 		tqe = kmem_alloc(sizeof(*tqe), KM_SLEEP);
    189 	if (tqe == NULL)
    190 		return (taskqid_t) NULL;
    191 
    192 	mutex_enter(&tq->tq_lock);
    193 	if (ISSET(flags, TQ_NOQUEUE) && tq->tq_active == tq->tq_nthreads) {
    194 		mutex_exit(&tq->tq_lock);
    195 		kmem_free(tqe, sizeof(*tqe));
    196 		return (taskqid_t) NULL;
    197 	}
    198 	tqe->tqent_dynamic = 1;
    199 	tqe->tqent_queued = 0;
    200 	tqe->tqent_func = func;
    201 	tqe->tqent_arg = arg;
    202 	taskq_dispatch_common(tq, tqe, flags);
    203 
    204 	return (taskqid_t) tqe;
    205 }
    206 
    207 /*
    208  * Dispatch a preallocated task entry.
    209  * Assume caller zeroed it.
    210  */
    211 void
    212 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
    213     taskq_ent_t *tqe)
    214 {
    215 
    216 	KASSERT(!ISSET(flags, ~(TQ_FRONT)));
    217 
    218 	tqe->tqent_func = func;
    219 	tqe->tqent_arg = arg;
    220 	mutex_enter(&tq->tq_lock);
    221 	taskq_dispatch_common(tq, tqe, flags);
    222 }
    223 
    224 /*
    225  * Wait until all tasks have completed.
    226  */
    227 void
    228 taskq_wait(taskq_t *tq)
    229 {
    230 
    231 	KASSERT(!taskq_member(tq, curlwp));
    232 
    233 	mutex_enter(&tq->tq_lock);
    234 	while (tq->tq_active)
    235 		kpause("qwait", false, 1, &tq->tq_lock);
    236 	mutex_exit(&tq->tq_lock);
    237 }
    238 
    239 /*
    240  * True if the current thread is an executor for this queue.
    241  */
    242 int
    243 taskq_member(taskq_t *tq, kthread_t *thread)
    244 {
    245 
    246 	KASSERT(thread == curlwp);
    247 
    248 	return (lwp_getspecific(taskq_lwp_key) == tq);
    249 }
    250 
    251 /*
    252  * Create a task queue.
    253  * Allocation hints are ignored.
    254  */
    255 taskq_t *
    256 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc,
    257     int maxalloc, uint_t flags)
    258 {
    259 	int i;
    260 	struct threadpool *threadpool;
    261 	taskq_t *tq;
    262 
    263 	KASSERT(!ISSET(flags,
    264 	    ~(TASKQ_DYNAMIC | TASKQ_PREPOPULATE | TASKQ_THREADS_CPU_PCT)));
    265 
    266 	if (threadpool_get(&threadpool, pri) != 0)
    267 		return NULL;
    268 
    269 	if (ISSET(flags, TASKQ_THREADS_CPU_PCT))
    270 		nthreads = MAX((ncpu * nthreads) / 100, 1);
    271 
    272 	tq = kmem_zalloc(sizeof(*tq), KM_SLEEP);
    273 	tq->tq_nthreads = nthreads;
    274 	tq->tq_pri = pri;
    275 	tq->tq_flags = flags;
    276 	mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, IPL_NONE);
    277 	cv_init(&tq->tq_cv, NULL, CV_DEFAULT, NULL);
    278 	SIMPLEQ_INIT(&tq->tq_list);
    279 	tq->tq_executor = kmem_alloc(sizeof(*tq->tq_executor) * nthreads,
    280 	    KM_SLEEP);
    281 	for (i = 0; i < nthreads; i++) {
    282 		threadpool_job_init(&tq->tq_executor[i].te_job, task_executor,
    283 		    &tq->tq_lock, "%s/%d", name, i);
    284 		tq->tq_executor[i].te_self = tq;
    285 		tq->tq_executor[i].te_running = 0;
    286 	}
    287 	tq->tq_threadpool = threadpool;
    288 
    289 	return tq;
    290 }
    291 
    292 taskq_t *
    293 taskq_create_proc(const char *name, int nthreads, pri_t pri, int minalloc,
    294     int maxalloc, struct proc *proc, uint_t flags)
    295 {
    296 
    297 	return taskq_create(name, nthreads, pri, minalloc, maxalloc, flags);
    298 }
    299 
    300 /*
    301  * Destroy a task queue.
    302  */
    303 void
    304 taskq_destroy(taskq_t *tq)
    305 {
    306 	int i;
    307 	taskq_ent_t *tqe;
    308 
    309 	KASSERT(!taskq_member(tq, curlwp));
    310 
    311 	/* Wait for tasks to complete. */
    312 	taskq_wait(tq);
    313 
    314 	/* Mark destroyed and ask running executors to quit. */
    315 	mutex_enter(&tq->tq_lock);
    316 	tq->tq_destroyed = 1;
    317 	cv_broadcast(&tq->tq_cv);
    318 
    319 	/* Wait for all executors to quit. */
    320 	while (tq->tq_running > 0)
    321 		kpause("tqdestroy", false, 1, &tq->tq_lock);
    322 	mutex_exit(&tq->tq_lock);
    323 
    324 	for (i = 0; i < tq->tq_nthreads; i++)
    325 		threadpool_job_destroy(&tq->tq_executor[i].te_job);
    326 	threadpool_put(tq->tq_threadpool, tq->tq_pri);
    327 	mutex_destroy(&tq->tq_lock);
    328 	cv_destroy(&tq->tq_cv);
    329 	kmem_free(tq->tq_executor, sizeof(*tq->tq_executor) * tq->tq_nthreads);
    330 	kmem_free(tq, sizeof(*tq));
    331 }
    332