Lines Matching defs:job
37 * can be given jobs to assign to a worker thread. Scheduling a job in
76 * touching remote CPUs' memory when scheduling a job, but that still
141 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
143 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
145 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
148 "struct threadpool_job *"/*job*/,
159 "struct threadpool_job *"/*job*/);
162 "struct threadpool_job *"/*job*/,
172 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
752 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn,
758 (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap);
761 job->job_lock = lock;
762 job->job_thread = NULL;
763 job->job_refcnt = 0;
764 cv_init(&job->job_cv, job->job_name);
765 job->job_fn = fn;
769 threadpool_job_dead(struct threadpool_job *job)
772 panic("threadpool job %p ran after destruction", job);
776 threadpool_job_destroy(struct threadpool_job *job)
781 KASSERTMSG((job->job_thread == NULL), "job %p still running", job);
783 mutex_enter(job->job_lock);
784 while (0 < atomic_load_relaxed(&job->job_refcnt))
785 cv_wait(&job->job_cv, job->job_lock);
786 mutex_exit(job->job_lock);
788 job->job_lock = NULL;
789 KASSERT(job->job_thread == NULL);
790 KASSERT(job->job_refcnt == 0);
791 KASSERT(!cv_has_waiters(&job->job_cv));
792 cv_destroy(&job->job_cv);
793 job->job_fn = threadpool_job_dead;
794 (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
798 threadpool_job_hold(struct threadpool_job *job)
802 refcnt = atomic_inc_uint_nv(&job->job_refcnt);
807 threadpool_job_rele(struct threadpool_job *job)
811 KASSERT(mutex_owned(job->job_lock));
813 refcnt = atomic_dec_uint_nv(&job->job_refcnt);
816 cv_broadcast(&job->job_cv);
820 threadpool_job_done(struct threadpool_job *job)
823 KASSERT(mutex_owned(job->job_lock));
824 KASSERT(job->job_thread != NULL);
825 KASSERT(job->job_thread->tpt_lwp == curlwp);
829 * we call the job work function, and we are only preserving it
833 curlwp->l_name = job->job_thread->tpt_lwp_savedname;
837 * Inline the work of threadpool_job_rele(); the job is already
843 KASSERT(0 < atomic_load_relaxed(&job->job_refcnt));
844 unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt);
846 cv_broadcast(&job->job_cv);
847 job->job_thread = NULL;
851 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
854 KASSERT(mutex_owned(job->job_lock));
856 SDT_PROBE2(sdt, kernel, threadpool, schedule__job, pool, job);
859 * If the job's already running, let it keep running. The job
864 if (__predict_true(job->job_thread != NULL)) {
866 pool, job);
870 threadpool_job_hold(job);
872 /* Otherwise, try to assign a thread to the job. */
877 pool, job);
878 job->job_thread = &pool->tp_dispatcher;
879 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
882 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
884 pool, job, job->job_thread->tpt_lwp);
885 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
887 job->job_thread->tpt_job = job;
891 KASSERT(job->job_thread != NULL);
892 cv_broadcast(&job->job_thread->tpt_cv);
897 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
900 KASSERT(mutex_owned(job->job_lock));
906 * => "pool" is something other than what the job was
908 * for example, a job is percpu-scheduled on CPU0
913 * => "job" is not yet running, but is assigned to the
917 * the job is already running. The failure mode is that the
918 * caller is told the job is running, and thus has to wait.
919 * The dispatcher will eventually get to it and the job will
923 if (job->job_thread == NULL) {
926 } else if (job->job_thread == &pool->tp_dispatcher) {
928 job->job_thread = NULL;
930 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
932 threadpool_job_rele(job);
941 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
946 * the job lock (used to interlock the cv_wait()) may in fact
951 KASSERT(mutex_owned(job->job_lock));
953 if (threadpool_cancel_job_async(pool, job))
957 while (job->job_thread != NULL)
958 cv_wait(&job->job_cv, job->job_lock);
984 /* Wait until there's a job. */
1042 /* There are idle threads, so try giving one a job. */
1043 struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs);
1046 * Take an extra reference on the job temporarily so that
1049 threadpool_job_hold(job);
1052 mutex_enter(job->job_lock);
1053 /* If the job was cancelled, we'll no longer be its thread. */
1054 if (__predict_true(job->job_thread == dispatcher)) {
1056 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
1064 dispatcher__race, pool, job);
1065 TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
1069 * Assign the job to the thread and
1076 dispatcher__assign, job, thread->tpt_lwp);
1080 thread->tpt_job = job;
1081 job->job_thread = thread;
1086 threadpool_job_rele(job);
1087 mutex_exit(job->job_lock);
1119 /* Wait until we are assigned a job
1136 struct threadpool_job *const job = thread->tpt_job;
1137 KASSERT(job != NULL);
1139 /* Set our lwp name to reflect what job we're doing. */
1143 curlwp->l_name = job->job_name;
1148 SDT_PROBE2(sdt, kernel, threadpool, thread__job, pool, job);
1150 /* Run the job. */
1151 (*job->job_fn)(job);
1159 * job after this because threadpool_job_done() drops the
1160 * last reference on the job while the job is locked.
1164 KASSERT(thread->tpt_job == job);