101e04c3fSmrg/* 201e04c3fSmrg * Copyright © 2016 Advanced Micro Devices, Inc. 301e04c3fSmrg * All Rights Reserved. 401e04c3fSmrg * 501e04c3fSmrg * Permission is hereby granted, free of charge, to any person obtaining 601e04c3fSmrg * a copy of this software and associated documentation files (the 701e04c3fSmrg * "Software"), to deal in the Software without restriction, including 801e04c3fSmrg * without limitation the rights to use, copy, modify, merge, publish, 901e04c3fSmrg * distribute, sub license, and/or sell copies of the Software, and to 1001e04c3fSmrg * permit persons to whom the Software is furnished to do so, subject to 1101e04c3fSmrg * the following conditions: 1201e04c3fSmrg * 1301e04c3fSmrg * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 1401e04c3fSmrg * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 1501e04c3fSmrg * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 1601e04c3fSmrg * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS 1701e04c3fSmrg * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 1801e04c3fSmrg * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 1901e04c3fSmrg * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 2001e04c3fSmrg * USE OR OTHER DEALINGS IN THE SOFTWARE. 2101e04c3fSmrg * 2201e04c3fSmrg * The above copyright notice and this permission notice (including the 2301e04c3fSmrg * next paragraph) shall be included in all copies or substantial portions 2401e04c3fSmrg * of the Software. 2501e04c3fSmrg */ 2601e04c3fSmrg 2701e04c3fSmrg#include "u_queue.h" 2801e04c3fSmrg 291463c08dSmrg#include "c11/threads.h" 301463c08dSmrg#include "util/u_cpu_detect.h" 3101e04c3fSmrg#include "util/os_time.h" 3201e04c3fSmrg#include "util/u_string.h" 3301e04c3fSmrg#include "util/u_thread.h" 3401e04c3fSmrg#include "u_process.h" 3501e04c3fSmrg 361463c08dSmrg#if defined(__linux__) 371463c08dSmrg#include <sys/time.h> 381463c08dSmrg#include <sys/resource.h> 391463c08dSmrg#include <sys/syscall.h> 401463c08dSmrg#endif 411463c08dSmrg 421463c08dSmrg 431463c08dSmrg/* Define 256MB */ 441463c08dSmrg#define S_256MB (256 * 1024 * 1024) 451463c08dSmrg 46d8407755Smayastatic void 47d8407755Smayautil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 48d8407755Smaya bool finish_locked); 4901e04c3fSmrg 5001e04c3fSmrg/**************************************************************************** 5101e04c3fSmrg * Wait for all queues to assert idle when exit() is called. 5201e04c3fSmrg * 5301e04c3fSmrg * Otherwise, C++ static variable destructors can be called while threads 5401e04c3fSmrg * are using the static variables. 5501e04c3fSmrg */ 5601e04c3fSmrg 5701e04c3fSmrgstatic once_flag atexit_once_flag = ONCE_FLAG_INIT; 5801e04c3fSmrgstatic struct list_head queue_list; 5901e04c3fSmrgstatic mtx_t exit_mutex = _MTX_INITIALIZER_NP; 6001e04c3fSmrg 61d03f69fbSmaya#define HAVE_NOATEXIT 62d03f69fbSmaya#if defined(HAVE_NOATEXIT) 637e995a2eSmrgstatic int global_init_called = 0; 647e995a2eSmrg 657e995a2eSmrgstatic void __attribute__((__destructor__)) 66d03f69fbSmaya#else 67d03f69fbSmayastatic void 68d03f69fbSmaya#endif 6901e04c3fSmrgatexit_handler(void) 7001e04c3fSmrg{ 7101e04c3fSmrg struct util_queue *iter; 7201e04c3fSmrg 73d03f69fbSmaya#if defined(HAVE_NOATEXIT) 747e995a2eSmrg if (!global_init_called) 757e995a2eSmrg return; 76d03f69fbSmaya#endif 777e995a2eSmrg 7801e04c3fSmrg mtx_lock(&exit_mutex); 7901e04c3fSmrg /* Wait for all queues to assert idle. */ 8001e04c3fSmrg LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { 81d8407755Smaya util_queue_kill_threads(iter, 0, false); 8201e04c3fSmrg } 8301e04c3fSmrg mtx_unlock(&exit_mutex); 8401e04c3fSmrg} 8501e04c3fSmrg 8601e04c3fSmrgstatic void 8701e04c3fSmrgglobal_init(void) 8801e04c3fSmrg{ 891463c08dSmrg list_inithead(&queue_list); 90d03f69fbSmaya#if defined(HAVE_NOATEXIT) 917e995a2eSmrg global_init_called = 1; 92d03f69fbSmaya#else 93d03f69fbSmaya atexit(atexit_handler); 94d03f69fbSmaya#endif 9501e04c3fSmrg} 9601e04c3fSmrg 9701e04c3fSmrgstatic void 9801e04c3fSmrgadd_to_atexit_list(struct util_queue *queue) 9901e04c3fSmrg{ 10001e04c3fSmrg call_once(&atexit_once_flag, global_init); 10101e04c3fSmrg 10201e04c3fSmrg mtx_lock(&exit_mutex); 1031463c08dSmrg list_add(&queue->head, &queue_list); 10401e04c3fSmrg mtx_unlock(&exit_mutex); 10501e04c3fSmrg} 10601e04c3fSmrg 10701e04c3fSmrgstatic void 10801e04c3fSmrgremove_from_atexit_list(struct util_queue *queue) 10901e04c3fSmrg{ 11001e04c3fSmrg struct util_queue *iter, *tmp; 11101e04c3fSmrg 11201e04c3fSmrg mtx_lock(&exit_mutex); 11301e04c3fSmrg LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) { 11401e04c3fSmrg if (iter == queue) { 1151463c08dSmrg list_del(&iter->head); 11601e04c3fSmrg break; 11701e04c3fSmrg } 11801e04c3fSmrg } 11901e04c3fSmrg mtx_unlock(&exit_mutex); 12001e04c3fSmrg} 12101e04c3fSmrg 12201e04c3fSmrg/**************************************************************************** 12301e04c3fSmrg * util_queue_fence 12401e04c3fSmrg */ 12501e04c3fSmrg 12601e04c3fSmrg#ifdef UTIL_QUEUE_FENCE_FUTEX 12701e04c3fSmrgstatic bool 12801e04c3fSmrgdo_futex_fence_wait(struct util_queue_fence *fence, 12901e04c3fSmrg bool timeout, int64_t abs_timeout) 13001e04c3fSmrg{ 1311463c08dSmrg uint32_t v = p_atomic_read_relaxed(&fence->val); 13201e04c3fSmrg struct timespec ts; 13301e04c3fSmrg ts.tv_sec = abs_timeout / (1000*1000*1000); 13401e04c3fSmrg ts.tv_nsec = abs_timeout % (1000*1000*1000); 13501e04c3fSmrg 13601e04c3fSmrg while (v != 0) { 13701e04c3fSmrg if (v != 2) { 13801e04c3fSmrg v = p_atomic_cmpxchg(&fence->val, 1, 2); 13901e04c3fSmrg if (v == 0) 14001e04c3fSmrg return true; 14101e04c3fSmrg } 14201e04c3fSmrg 14301e04c3fSmrg int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL); 14401e04c3fSmrg if (timeout && r < 0) { 14501e04c3fSmrg if (errno == ETIMEDOUT) 14601e04c3fSmrg return false; 14701e04c3fSmrg } 14801e04c3fSmrg 1491463c08dSmrg v = p_atomic_read_relaxed(&fence->val); 15001e04c3fSmrg } 15101e04c3fSmrg 15201e04c3fSmrg return true; 15301e04c3fSmrg} 15401e04c3fSmrg 15501e04c3fSmrgvoid 15601e04c3fSmrg_util_queue_fence_wait(struct util_queue_fence *fence) 15701e04c3fSmrg{ 15801e04c3fSmrg do_futex_fence_wait(fence, false, 0); 15901e04c3fSmrg} 16001e04c3fSmrg 16101e04c3fSmrgbool 16201e04c3fSmrg_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 16301e04c3fSmrg int64_t abs_timeout) 16401e04c3fSmrg{ 16501e04c3fSmrg return do_futex_fence_wait(fence, true, abs_timeout); 16601e04c3fSmrg} 16701e04c3fSmrg 16801e04c3fSmrg#endif 16901e04c3fSmrg 17001e04c3fSmrg#ifdef UTIL_QUEUE_FENCE_STANDARD 17101e04c3fSmrgvoid 17201e04c3fSmrgutil_queue_fence_signal(struct util_queue_fence *fence) 17301e04c3fSmrg{ 17401e04c3fSmrg mtx_lock(&fence->mutex); 17501e04c3fSmrg fence->signalled = true; 17601e04c3fSmrg cnd_broadcast(&fence->cond); 17701e04c3fSmrg mtx_unlock(&fence->mutex); 17801e04c3fSmrg} 17901e04c3fSmrg 18001e04c3fSmrgvoid 18101e04c3fSmrg_util_queue_fence_wait(struct util_queue_fence *fence) 18201e04c3fSmrg{ 18301e04c3fSmrg mtx_lock(&fence->mutex); 18401e04c3fSmrg while (!fence->signalled) 18501e04c3fSmrg cnd_wait(&fence->cond, &fence->mutex); 18601e04c3fSmrg mtx_unlock(&fence->mutex); 18701e04c3fSmrg} 18801e04c3fSmrg 18901e04c3fSmrgbool 19001e04c3fSmrg_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 19101e04c3fSmrg int64_t abs_timeout) 19201e04c3fSmrg{ 19301e04c3fSmrg /* This terrible hack is made necessary by the fact that we really want an 19401e04c3fSmrg * internal interface consistent with os_time_*, but cnd_timedwait is spec'd 19501e04c3fSmrg * to be relative to the TIME_UTC clock. 19601e04c3fSmrg */ 19701e04c3fSmrg int64_t rel = abs_timeout - os_time_get_nano(); 19801e04c3fSmrg 19901e04c3fSmrg if (rel > 0) { 20001e04c3fSmrg struct timespec ts; 20101e04c3fSmrg 2021463c08dSmrg#if defined(HAVE_TIMESPEC_GET) || defined(_WIN32) 20301e04c3fSmrg timespec_get(&ts, TIME_UTC); 2041463c08dSmrg#else 2051463c08dSmrg clock_gettime(CLOCK_REALTIME, &ts); 2061463c08dSmrg#endif 20701e04c3fSmrg 20801e04c3fSmrg ts.tv_sec += abs_timeout / (1000*1000*1000); 20901e04c3fSmrg ts.tv_nsec += abs_timeout % (1000*1000*1000); 21001e04c3fSmrg if (ts.tv_nsec >= (1000*1000*1000)) { 21101e04c3fSmrg ts.tv_sec++; 21201e04c3fSmrg ts.tv_nsec -= (1000*1000*1000); 21301e04c3fSmrg } 21401e04c3fSmrg 21501e04c3fSmrg mtx_lock(&fence->mutex); 21601e04c3fSmrg while (!fence->signalled) { 21701e04c3fSmrg if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success) 21801e04c3fSmrg break; 21901e04c3fSmrg } 22001e04c3fSmrg mtx_unlock(&fence->mutex); 22101e04c3fSmrg } 22201e04c3fSmrg 22301e04c3fSmrg return fence->signalled; 22401e04c3fSmrg} 22501e04c3fSmrg 22601e04c3fSmrgvoid 22701e04c3fSmrgutil_queue_fence_init(struct util_queue_fence *fence) 22801e04c3fSmrg{ 22901e04c3fSmrg memset(fence, 0, sizeof(*fence)); 23001e04c3fSmrg (void) mtx_init(&fence->mutex, mtx_plain); 23101e04c3fSmrg cnd_init(&fence->cond); 23201e04c3fSmrg fence->signalled = true; 23301e04c3fSmrg} 23401e04c3fSmrg 23501e04c3fSmrgvoid 23601e04c3fSmrgutil_queue_fence_destroy(struct util_queue_fence *fence) 23701e04c3fSmrg{ 23801e04c3fSmrg assert(fence->signalled); 23901e04c3fSmrg 24001e04c3fSmrg /* Ensure that another thread is not in the middle of 24101e04c3fSmrg * util_queue_fence_signal (having set the fence to signalled but still 24201e04c3fSmrg * holding the fence mutex). 24301e04c3fSmrg * 24401e04c3fSmrg * A common contract between threads is that as soon as a fence is signalled 24501e04c3fSmrg * by thread A, thread B is allowed to destroy it. Since 24601e04c3fSmrg * util_queue_fence_is_signalled does not lock the fence mutex (for 24701e04c3fSmrg * performance reasons), we must do so here. 24801e04c3fSmrg */ 24901e04c3fSmrg mtx_lock(&fence->mutex); 25001e04c3fSmrg mtx_unlock(&fence->mutex); 25101e04c3fSmrg 25201e04c3fSmrg cnd_destroy(&fence->cond); 25301e04c3fSmrg mtx_destroy(&fence->mutex); 25401e04c3fSmrg} 25501e04c3fSmrg#endif 25601e04c3fSmrg 25701e04c3fSmrg/**************************************************************************** 25801e04c3fSmrg * util_queue implementation 25901e04c3fSmrg */ 26001e04c3fSmrg 26101e04c3fSmrgstruct thread_input { 26201e04c3fSmrg struct util_queue *queue; 26301e04c3fSmrg int thread_index; 26401e04c3fSmrg}; 26501e04c3fSmrg 26601e04c3fSmrgstatic int 26701e04c3fSmrgutil_queue_thread_func(void *input) 26801e04c3fSmrg{ 26901e04c3fSmrg struct util_queue *queue = ((struct thread_input*)input)->queue; 27001e04c3fSmrg int thread_index = ((struct thread_input*)input)->thread_index; 27101e04c3fSmrg 27201e04c3fSmrg free(input); 27301e04c3fSmrg 27401e04c3fSmrg if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) { 27501e04c3fSmrg /* Don't inherit the thread affinity from the parent thread. 27601e04c3fSmrg * Set the full mask. 27701e04c3fSmrg */ 2781463c08dSmrg uint32_t mask[UTIL_MAX_CPUS / 32]; 27901e04c3fSmrg 2801463c08dSmrg memset(mask, 0xff, sizeof(mask)); 2811463c08dSmrg 2821463c08dSmrg /* Ensure util_cpu_caps.num_cpu_mask_bits is initialized: */ 2831463c08dSmrg util_cpu_detect(); 2841463c08dSmrg 2851463c08dSmrg util_set_current_thread_affinity(mask, NULL, 2861463c08dSmrg util_get_cpu_caps()->num_cpu_mask_bits); 2871463c08dSmrg } 2881463c08dSmrg 2891463c08dSmrg#if defined(__linux__) 2901463c08dSmrg if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { 2911463c08dSmrg /* The nice() function can only set a maximum of 19. */ 2921463c08dSmrg setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19); 29301e04c3fSmrg } 29401e04c3fSmrg#endif 29501e04c3fSmrg 29601e04c3fSmrg if (strlen(queue->name) > 0) { 29701e04c3fSmrg char name[16]; 2981463c08dSmrg snprintf(name, sizeof(name), "%s%i", queue->name, thread_index); 29901e04c3fSmrg u_thread_setname(name); 30001e04c3fSmrg } 30101e04c3fSmrg 30201e04c3fSmrg while (1) { 30301e04c3fSmrg struct util_queue_job job; 30401e04c3fSmrg 30501e04c3fSmrg mtx_lock(&queue->lock); 30601e04c3fSmrg assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 30701e04c3fSmrg 30801e04c3fSmrg /* wait if the queue is empty */ 309d8407755Smaya while (thread_index < queue->num_threads && queue->num_queued == 0) 31001e04c3fSmrg cnd_wait(&queue->has_queued_cond, &queue->lock); 31101e04c3fSmrg 312d8407755Smaya /* only kill threads that are above "num_threads" */ 313d8407755Smaya if (thread_index >= queue->num_threads) { 31401e04c3fSmrg mtx_unlock(&queue->lock); 31501e04c3fSmrg break; 31601e04c3fSmrg } 31701e04c3fSmrg 31801e04c3fSmrg job = queue->jobs[queue->read_idx]; 31901e04c3fSmrg memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); 32001e04c3fSmrg queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; 32101e04c3fSmrg 32201e04c3fSmrg queue->num_queued--; 32301e04c3fSmrg cnd_signal(&queue->has_space_cond); 3241463c08dSmrg if (job.job) 3251463c08dSmrg queue->total_jobs_size -= job.job_size; 32601e04c3fSmrg mtx_unlock(&queue->lock); 32701e04c3fSmrg 32801e04c3fSmrg if (job.job) { 3291463c08dSmrg job.execute(job.job, job.global_data, thread_index); 3301463c08dSmrg if (job.fence) 3311463c08dSmrg util_queue_fence_signal(job.fence); 33201e04c3fSmrg if (job.cleanup) 3331463c08dSmrg job.cleanup(job.job, job.global_data, thread_index); 33401e04c3fSmrg } 33501e04c3fSmrg } 33601e04c3fSmrg 337d8407755Smaya /* signal remaining jobs if all threads are being terminated */ 33801e04c3fSmrg mtx_lock(&queue->lock); 339d8407755Smaya if (queue->num_threads == 0) { 340d8407755Smaya for (unsigned i = queue->read_idx; i != queue->write_idx; 341d8407755Smaya i = (i + 1) % queue->max_jobs) { 342d8407755Smaya if (queue->jobs[i].job) { 3431463c08dSmrg if (queue->jobs[i].fence) 3441463c08dSmrg util_queue_fence_signal(queue->jobs[i].fence); 345d8407755Smaya queue->jobs[i].job = NULL; 346d8407755Smaya } 34701e04c3fSmrg } 348d8407755Smaya queue->read_idx = queue->write_idx; 349d8407755Smaya queue->num_queued = 0; 35001e04c3fSmrg } 35101e04c3fSmrg mtx_unlock(&queue->lock); 35201e04c3fSmrg return 0; 35301e04c3fSmrg} 35401e04c3fSmrg 355d8407755Smayastatic bool 356d8407755Smayautil_queue_create_thread(struct util_queue *queue, unsigned index) 357d8407755Smaya{ 358d8407755Smaya struct thread_input *input = 359d8407755Smaya (struct thread_input *) malloc(sizeof(struct thread_input)); 360d8407755Smaya input->queue = queue; 361d8407755Smaya input->thread_index = index; 362d8407755Smaya 363d8407755Smaya queue->threads[index] = u_thread_create(util_queue_thread_func, input); 364d8407755Smaya 365d8407755Smaya if (!queue->threads[index]) { 366d8407755Smaya free(input); 367d8407755Smaya return false; 368d8407755Smaya } 369d8407755Smaya 370d8407755Smaya if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { 3711463c08dSmrg#if defined(__linux__) && defined(SCHED_BATCH) 372d8407755Smaya struct sched_param sched_param = {0}; 373d8407755Smaya 374d8407755Smaya /* The nice() function can only set a maximum of 19. 3751463c08dSmrg * SCHED_BATCH gives the scheduler a hint that this is a latency 3761463c08dSmrg * insensitive thread. 377d8407755Smaya * 378d8407755Smaya * Note that Linux only allows decreasing the priority. The original 379d8407755Smaya * priority can't be restored. 380d8407755Smaya */ 3811463c08dSmrg pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param); 382d8407755Smaya#endif 383d8407755Smaya } 384d8407755Smaya return true; 385d8407755Smaya} 386d8407755Smaya 387d8407755Smayavoid 388d8407755Smayautil_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads) 389d8407755Smaya{ 390d8407755Smaya num_threads = MIN2(num_threads, queue->max_threads); 391d8407755Smaya num_threads = MAX2(num_threads, 1); 392d8407755Smaya 3931463c08dSmrg simple_mtx_lock(&queue->finish_lock); 394d8407755Smaya unsigned old_num_threads = queue->num_threads; 395d8407755Smaya 396d8407755Smaya if (num_threads == old_num_threads) { 3971463c08dSmrg simple_mtx_unlock(&queue->finish_lock); 398d8407755Smaya return; 399d8407755Smaya } 400d8407755Smaya 401d8407755Smaya if (num_threads < old_num_threads) { 402d8407755Smaya util_queue_kill_threads(queue, num_threads, true); 4031463c08dSmrg simple_mtx_unlock(&queue->finish_lock); 404d8407755Smaya return; 405d8407755Smaya } 406d8407755Smaya 407d8407755Smaya /* Create threads. 408d8407755Smaya * 409d8407755Smaya * We need to update num_threads first, because threads terminate 410d8407755Smaya * when thread_index < num_threads. 411d8407755Smaya */ 412d8407755Smaya queue->num_threads = num_threads; 413d8407755Smaya for (unsigned i = old_num_threads; i < num_threads; i++) { 414d8407755Smaya if (!util_queue_create_thread(queue, i)) 415d8407755Smaya break; 416d8407755Smaya } 4171463c08dSmrg simple_mtx_unlock(&queue->finish_lock); 418d8407755Smaya} 419d8407755Smaya 42001e04c3fSmrgbool 42101e04c3fSmrgutil_queue_init(struct util_queue *queue, 42201e04c3fSmrg const char *name, 42301e04c3fSmrg unsigned max_jobs, 42401e04c3fSmrg unsigned num_threads, 4251463c08dSmrg unsigned flags, 4261463c08dSmrg void *global_data) 42701e04c3fSmrg{ 42801e04c3fSmrg unsigned i; 42901e04c3fSmrg 43001e04c3fSmrg /* Form the thread name from process_name and name, limited to 13 43101e04c3fSmrg * characters. Characters 14-15 are reserved for the thread number. 43201e04c3fSmrg * Character 16 should be 0. Final form: "process:name12" 43301e04c3fSmrg * 43401e04c3fSmrg * If name is too long, it's truncated. If any space is left, the process 43501e04c3fSmrg * name fills it. 43601e04c3fSmrg */ 43701e04c3fSmrg const char *process_name = util_get_process_name(); 43801e04c3fSmrg int process_len = process_name ? strlen(process_name) : 0; 43901e04c3fSmrg int name_len = strlen(name); 44001e04c3fSmrg const int max_chars = sizeof(queue->name) - 1; 44101e04c3fSmrg 44201e04c3fSmrg name_len = MIN2(name_len, max_chars); 44301e04c3fSmrg 44401e04c3fSmrg /* See if there is any space left for the process name, reserve 1 for 44501e04c3fSmrg * the colon. */ 44601e04c3fSmrg process_len = MIN2(process_len, max_chars - name_len - 1); 44701e04c3fSmrg process_len = MAX2(process_len, 0); 44801e04c3fSmrg 44901e04c3fSmrg memset(queue, 0, sizeof(*queue)); 45001e04c3fSmrg 45101e04c3fSmrg if (process_len) { 4521463c08dSmrg snprintf(queue->name, sizeof(queue->name), "%.*s:%s", 4531463c08dSmrg process_len, process_name, name); 45401e04c3fSmrg } else { 4551463c08dSmrg snprintf(queue->name, sizeof(queue->name), "%s", name); 45601e04c3fSmrg } 45701e04c3fSmrg 45801e04c3fSmrg queue->flags = flags; 459d8407755Smaya queue->max_threads = num_threads; 4601463c08dSmrg queue->num_threads = (flags & UTIL_QUEUE_INIT_SCALE_THREADS) ? 1 : num_threads; 46101e04c3fSmrg queue->max_jobs = max_jobs; 4621463c08dSmrg queue->global_data = global_data; 46301e04c3fSmrg 46401e04c3fSmrg (void) mtx_init(&queue->lock, mtx_plain); 4651463c08dSmrg (void) simple_mtx_init(&queue->finish_lock, mtx_plain); 46601e04c3fSmrg 46701e04c3fSmrg queue->num_queued = 0; 46801e04c3fSmrg cnd_init(&queue->has_queued_cond); 46901e04c3fSmrg cnd_init(&queue->has_space_cond); 47001e04c3fSmrg 4711463c08dSmrg queue->jobs = (struct util_queue_job*) 4721463c08dSmrg calloc(max_jobs, sizeof(struct util_queue_job)); 4731463c08dSmrg if (!queue->jobs) 4741463c08dSmrg goto fail; 4751463c08dSmrg 4761463c08dSmrg queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t)); 47701e04c3fSmrg if (!queue->threads) 47801e04c3fSmrg goto fail; 47901e04c3fSmrg 48001e04c3fSmrg /* start threads */ 4811463c08dSmrg for (i = 0; i < queue->num_threads; i++) { 482d8407755Smaya if (!util_queue_create_thread(queue, i)) { 48301e04c3fSmrg if (i == 0) { 48401e04c3fSmrg /* no threads created, fail */ 48501e04c3fSmrg goto fail; 48601e04c3fSmrg } else { 48701e04c3fSmrg /* at least one thread created, so use it */ 48801e04c3fSmrg queue->num_threads = i; 48901e04c3fSmrg break; 49001e04c3fSmrg } 49101e04c3fSmrg } 49201e04c3fSmrg } 49301e04c3fSmrg 49401e04c3fSmrg add_to_atexit_list(queue); 49501e04c3fSmrg return true; 49601e04c3fSmrg 49701e04c3fSmrgfail: 49801e04c3fSmrg free(queue->threads); 49901e04c3fSmrg 50001e04c3fSmrg if (queue->jobs) { 50101e04c3fSmrg cnd_destroy(&queue->has_space_cond); 50201e04c3fSmrg cnd_destroy(&queue->has_queued_cond); 50301e04c3fSmrg mtx_destroy(&queue->lock); 50401e04c3fSmrg free(queue->jobs); 50501e04c3fSmrg } 50601e04c3fSmrg /* also util_queue_is_initialized can be used to check for success */ 50701e04c3fSmrg memset(queue, 0, sizeof(*queue)); 50801e04c3fSmrg return false; 50901e04c3fSmrg} 51001e04c3fSmrg 51101e04c3fSmrgstatic void 512d8407755Smayautil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 513d8407755Smaya bool finish_locked) 51401e04c3fSmrg{ 51501e04c3fSmrg unsigned i; 51601e04c3fSmrg 51701e04c3fSmrg /* Signal all threads to terminate. */ 518d8407755Smaya if (!finish_locked) 5191463c08dSmrg simple_mtx_lock(&queue->finish_lock); 520d8407755Smaya 521d8407755Smaya if (keep_num_threads >= queue->num_threads) { 5221463c08dSmrg simple_mtx_unlock(&queue->finish_lock); 523d8407755Smaya return; 524d8407755Smaya } 525d8407755Smaya 52601e04c3fSmrg mtx_lock(&queue->lock); 527d8407755Smaya unsigned old_num_threads = queue->num_threads; 528d8407755Smaya /* Setting num_threads is what causes the threads to terminate. 529d8407755Smaya * Then cnd_broadcast wakes them up and they will exit their function. 530d8407755Smaya */ 531d8407755Smaya queue->num_threads = keep_num_threads; 53201e04c3fSmrg cnd_broadcast(&queue->has_queued_cond); 53301e04c3fSmrg mtx_unlock(&queue->lock); 53401e04c3fSmrg 535d8407755Smaya for (i = keep_num_threads; i < old_num_threads; i++) 53601e04c3fSmrg thrd_join(queue->threads[i], NULL); 537d8407755Smaya 538d8407755Smaya if (!finish_locked) 5391463c08dSmrg simple_mtx_unlock(&queue->finish_lock); 5401463c08dSmrg} 5411463c08dSmrg 5421463c08dSmrgstatic void 5431463c08dSmrgutil_queue_finish_execute(void *data, void *gdata, int num_thread) 5441463c08dSmrg{ 5451463c08dSmrg util_barrier *barrier = data; 5461463c08dSmrg util_barrier_wait(barrier); 54701e04c3fSmrg} 54801e04c3fSmrg 54901e04c3fSmrgvoid 55001e04c3fSmrgutil_queue_destroy(struct util_queue *queue) 55101e04c3fSmrg{ 552d8407755Smaya util_queue_kill_threads(queue, 0, false); 5531463c08dSmrg 5541463c08dSmrg /* This makes it safe to call on a queue that failedutil_queue_init. */ 5551463c08dSmrg if (queue->head.next != NULL) 5561463c08dSmrg remove_from_atexit_list(queue); 55701e04c3fSmrg 55801e04c3fSmrg cnd_destroy(&queue->has_space_cond); 55901e04c3fSmrg cnd_destroy(&queue->has_queued_cond); 5601463c08dSmrg simple_mtx_destroy(&queue->finish_lock); 56101e04c3fSmrg mtx_destroy(&queue->lock); 56201e04c3fSmrg free(queue->jobs); 56301e04c3fSmrg free(queue->threads); 56401e04c3fSmrg} 56501e04c3fSmrg 56601e04c3fSmrgvoid 56701e04c3fSmrgutil_queue_add_job(struct util_queue *queue, 56801e04c3fSmrg void *job, 56901e04c3fSmrg struct util_queue_fence *fence, 57001e04c3fSmrg util_queue_execute_func execute, 5711463c08dSmrg util_queue_execute_func cleanup, 5721463c08dSmrg const size_t job_size) 57301e04c3fSmrg{ 57401e04c3fSmrg struct util_queue_job *ptr; 57501e04c3fSmrg 57601e04c3fSmrg mtx_lock(&queue->lock); 577d8407755Smaya if (queue->num_threads == 0) { 57801e04c3fSmrg mtx_unlock(&queue->lock); 57901e04c3fSmrg /* well no good option here, but any leaks will be 58001e04c3fSmrg * short-lived as things are shutting down.. 58101e04c3fSmrg */ 58201e04c3fSmrg return; 58301e04c3fSmrg } 58401e04c3fSmrg 5851463c08dSmrg if (fence) 5861463c08dSmrg util_queue_fence_reset(fence); 58701e04c3fSmrg 58801e04c3fSmrg assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 58901e04c3fSmrg 5901463c08dSmrg 59101e04c3fSmrg if (queue->num_queued == queue->max_jobs) { 5921463c08dSmrg if ((queue->flags & UTIL_QUEUE_INIT_SCALE_THREADS) && 5931463c08dSmrg execute != util_queue_finish_execute && 5941463c08dSmrg queue->num_threads < queue->max_threads) { 5951463c08dSmrg util_queue_adjust_num_threads(queue, queue->num_threads + 1); 5961463c08dSmrg } 5971463c08dSmrg 5981463c08dSmrg if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL && 5991463c08dSmrg queue->total_jobs_size + job_size < S_256MB) { 60001e04c3fSmrg /* If the queue is full, make it larger to avoid waiting for a free 60101e04c3fSmrg * slot. 60201e04c3fSmrg */ 60301e04c3fSmrg unsigned new_max_jobs = queue->max_jobs + 8; 60401e04c3fSmrg struct util_queue_job *jobs = 60501e04c3fSmrg (struct util_queue_job*)calloc(new_max_jobs, 60601e04c3fSmrg sizeof(struct util_queue_job)); 60701e04c3fSmrg assert(jobs); 60801e04c3fSmrg 60901e04c3fSmrg /* Copy all queued jobs into the new list. */ 61001e04c3fSmrg unsigned num_jobs = 0; 61101e04c3fSmrg unsigned i = queue->read_idx; 61201e04c3fSmrg 61301e04c3fSmrg do { 61401e04c3fSmrg jobs[num_jobs++] = queue->jobs[i]; 61501e04c3fSmrg i = (i + 1) % queue->max_jobs; 61601e04c3fSmrg } while (i != queue->write_idx); 61701e04c3fSmrg 61801e04c3fSmrg assert(num_jobs == queue->num_queued); 61901e04c3fSmrg 62001e04c3fSmrg free(queue->jobs); 62101e04c3fSmrg queue->jobs = jobs; 62201e04c3fSmrg queue->read_idx = 0; 62301e04c3fSmrg queue->write_idx = num_jobs; 62401e04c3fSmrg queue->max_jobs = new_max_jobs; 62501e04c3fSmrg } else { 62601e04c3fSmrg /* Wait until there is a free slot. */ 62701e04c3fSmrg while (queue->num_queued == queue->max_jobs) 62801e04c3fSmrg cnd_wait(&queue->has_space_cond, &queue->lock); 62901e04c3fSmrg } 63001e04c3fSmrg } 63101e04c3fSmrg 63201e04c3fSmrg ptr = &queue->jobs[queue->write_idx]; 63301e04c3fSmrg assert(ptr->job == NULL); 63401e04c3fSmrg ptr->job = job; 6351463c08dSmrg ptr->global_data = queue->global_data; 63601e04c3fSmrg ptr->fence = fence; 63701e04c3fSmrg ptr->execute = execute; 63801e04c3fSmrg ptr->cleanup = cleanup; 6391463c08dSmrg ptr->job_size = job_size; 6401463c08dSmrg 64101e04c3fSmrg queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; 6421463c08dSmrg queue->total_jobs_size += ptr->job_size; 64301e04c3fSmrg 64401e04c3fSmrg queue->num_queued++; 64501e04c3fSmrg cnd_signal(&queue->has_queued_cond); 64601e04c3fSmrg mtx_unlock(&queue->lock); 64701e04c3fSmrg} 64801e04c3fSmrg 64901e04c3fSmrg/** 65001e04c3fSmrg * Remove a queued job. If the job hasn't started execution, it's removed from 65101e04c3fSmrg * the queue. If the job has started execution, the function waits for it to 65201e04c3fSmrg * complete. 65301e04c3fSmrg * 65401e04c3fSmrg * In all cases, the fence is signalled when the function returns. 65501e04c3fSmrg * 65601e04c3fSmrg * The function can be used when destroying an object associated with the job 65701e04c3fSmrg * when you don't care about the job completion state. 65801e04c3fSmrg */ 65901e04c3fSmrgvoid 66001e04c3fSmrgutil_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) 66101e04c3fSmrg{ 66201e04c3fSmrg bool removed = false; 66301e04c3fSmrg 66401e04c3fSmrg if (util_queue_fence_is_signalled(fence)) 66501e04c3fSmrg return; 66601e04c3fSmrg 66701e04c3fSmrg mtx_lock(&queue->lock); 66801e04c3fSmrg for (unsigned i = queue->read_idx; i != queue->write_idx; 66901e04c3fSmrg i = (i + 1) % queue->max_jobs) { 67001e04c3fSmrg if (queue->jobs[i].fence == fence) { 67101e04c3fSmrg if (queue->jobs[i].cleanup) 6721463c08dSmrg queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1); 67301e04c3fSmrg 67401e04c3fSmrg /* Just clear it. The threads will treat as a no-op job. */ 67501e04c3fSmrg memset(&queue->jobs[i], 0, sizeof(queue->jobs[i])); 67601e04c3fSmrg removed = true; 67701e04c3fSmrg break; 67801e04c3fSmrg } 67901e04c3fSmrg } 68001e04c3fSmrg mtx_unlock(&queue->lock); 68101e04c3fSmrg 68201e04c3fSmrg if (removed) 68301e04c3fSmrg util_queue_fence_signal(fence); 68401e04c3fSmrg else 68501e04c3fSmrg util_queue_fence_wait(fence); 68601e04c3fSmrg} 68701e04c3fSmrg 68801e04c3fSmrg/** 68901e04c3fSmrg * Wait until all previously added jobs have completed. 69001e04c3fSmrg */ 69101e04c3fSmrgvoid 69201e04c3fSmrgutil_queue_finish(struct util_queue *queue) 69301e04c3fSmrg{ 69401e04c3fSmrg util_barrier barrier; 695d8407755Smaya struct util_queue_fence *fences; 69601e04c3fSmrg 69701e04c3fSmrg /* If 2 threads were adding jobs for 2 different barries at the same time, 69801e04c3fSmrg * a deadlock would happen, because 1 barrier requires that all threads 69901e04c3fSmrg * wait for it exclusively. 70001e04c3fSmrg */ 7011463c08dSmrg simple_mtx_lock(&queue->finish_lock); 7021463c08dSmrg 7031463c08dSmrg /* The number of threads can be changed to 0, e.g. by the atexit handler. */ 7041463c08dSmrg if (!queue->num_threads) { 7051463c08dSmrg simple_mtx_unlock(&queue->finish_lock); 7061463c08dSmrg return; 7071463c08dSmrg } 7081463c08dSmrg 709d8407755Smaya fences = malloc(queue->num_threads * sizeof(*fences)); 710d8407755Smaya util_barrier_init(&barrier, queue->num_threads); 71101e04c3fSmrg 71201e04c3fSmrg for (unsigned i = 0; i < queue->num_threads; ++i) { 71301e04c3fSmrg util_queue_fence_init(&fences[i]); 7141463c08dSmrg util_queue_add_job(queue, &barrier, &fences[i], 7151463c08dSmrg util_queue_finish_execute, NULL, 0); 71601e04c3fSmrg } 71701e04c3fSmrg 71801e04c3fSmrg for (unsigned i = 0; i < queue->num_threads; ++i) { 71901e04c3fSmrg util_queue_fence_wait(&fences[i]); 72001e04c3fSmrg util_queue_fence_destroy(&fences[i]); 72101e04c3fSmrg } 7221463c08dSmrg simple_mtx_unlock(&queue->finish_lock); 72301e04c3fSmrg 72401e04c3fSmrg util_barrier_destroy(&barrier); 72501e04c3fSmrg 72601e04c3fSmrg free(fences); 72701e04c3fSmrg} 72801e04c3fSmrg 72901e04c3fSmrgint64_t 73001e04c3fSmrgutil_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) 73101e04c3fSmrg{ 73201e04c3fSmrg /* Allow some flexibility by not raising an error. */ 73301e04c3fSmrg if (thread_index >= queue->num_threads) 73401e04c3fSmrg return 0; 73501e04c3fSmrg 7361463c08dSmrg return util_thread_get_time_nano(queue->threads[thread_index]); 73701e04c3fSmrg} 738