1b8e80941Smrg/* 2b8e80941Smrg * Copyright © 2016 Advanced Micro Devices, Inc. 3b8e80941Smrg * All Rights Reserved. 4b8e80941Smrg * 5b8e80941Smrg * Permission is hereby granted, free of charge, to any person obtaining 6b8e80941Smrg * a copy of this software and associated documentation files (the 7b8e80941Smrg * "Software"), to deal in the Software without restriction, including 8b8e80941Smrg * without limitation the rights to use, copy, modify, merge, publish, 9b8e80941Smrg * distribute, sub license, and/or sell copies of the Software, and to 10b8e80941Smrg * permit persons to whom the Software is furnished to do so, subject to 11b8e80941Smrg * the following conditions: 12b8e80941Smrg * 13b8e80941Smrg * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 14b8e80941Smrg * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 15b8e80941Smrg * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 16b8e80941Smrg * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS 17b8e80941Smrg * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18b8e80941Smrg * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 19b8e80941Smrg * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20b8e80941Smrg * USE OR OTHER DEALINGS IN THE SOFTWARE. 21b8e80941Smrg * 22b8e80941Smrg * The above copyright notice and this permission notice (including the 23b8e80941Smrg * next paragraph) shall be included in all copies or substantial portions 24b8e80941Smrg * of the Software. 25b8e80941Smrg */ 26b8e80941Smrg 27b8e80941Smrg#include "u_queue.h" 28b8e80941Smrg 29b8e80941Smrg#include <time.h> 30b8e80941Smrg 31b8e80941Smrg#include "util/os_time.h" 32b8e80941Smrg#include "util/u_string.h" 33b8e80941Smrg#include "util/u_thread.h" 34b8e80941Smrg#include "u_process.h" 35b8e80941Smrg 36b8e80941Smrgstatic void 37b8e80941Smrgutil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 38b8e80941Smrg bool finish_locked); 39b8e80941Smrg 40b8e80941Smrg/**************************************************************************** 41b8e80941Smrg * Wait for all queues to assert idle when exit() is called. 42b8e80941Smrg * 43b8e80941Smrg * Otherwise, C++ static variable destructors can be called while threads 44b8e80941Smrg * are using the static variables. 45b8e80941Smrg */ 46b8e80941Smrg 47b8e80941Smrgstatic once_flag atexit_once_flag = ONCE_FLAG_INIT; 48b8e80941Smrgstatic struct list_head queue_list; 49b8e80941Smrgstatic mtx_t exit_mutex = _MTX_INITIALIZER_NP; 50b8e80941Smrg 51b8e80941Smrg#define HAVE_NOATEXIT 52b8e80941Smrg#if defined(HAVE_NOATEXIT) 53b8e80941Smrgstatic int global_init_called = 0; 54b8e80941Smrg 55b8e80941Smrgstatic void __attribute__((__destructor__)) 56b8e80941Smrg#else 57b8e80941Smrgstatic void 58b8e80941Smrg#endif 59b8e80941Smrgatexit_handler(void) 60b8e80941Smrg{ 61b8e80941Smrg struct util_queue *iter; 62b8e80941Smrg 63b8e80941Smrg#if defined(HAVE_NOATEXIT) 64b8e80941Smrg if (!global_init_called) 65b8e80941Smrg return; 66b8e80941Smrg#endif 67b8e80941Smrg 68b8e80941Smrg mtx_lock(&exit_mutex); 69b8e80941Smrg /* Wait for all queues to assert idle. */ 70b8e80941Smrg LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { 71b8e80941Smrg util_queue_kill_threads(iter, 0, false); 72b8e80941Smrg } 73b8e80941Smrg mtx_unlock(&exit_mutex); 74b8e80941Smrg} 75b8e80941Smrg 76b8e80941Smrgstatic void 77b8e80941Smrgglobal_init(void) 78b8e80941Smrg{ 79b8e80941Smrg LIST_INITHEAD(&queue_list); 80b8e80941Smrg#if defined(HAVE_NOATEXIT) 81b8e80941Smrg global_init_called = 1; 82b8e80941Smrg#else 83b8e80941Smrg atexit(atexit_handler); 84b8e80941Smrg#endif 85b8e80941Smrg} 86b8e80941Smrg 87b8e80941Smrgstatic void 88b8e80941Smrgadd_to_atexit_list(struct util_queue *queue) 89b8e80941Smrg{ 90b8e80941Smrg call_once(&atexit_once_flag, global_init); 91b8e80941Smrg 92b8e80941Smrg mtx_lock(&exit_mutex); 93b8e80941Smrg LIST_ADD(&queue->head, &queue_list); 94b8e80941Smrg mtx_unlock(&exit_mutex); 95b8e80941Smrg} 96b8e80941Smrg 97b8e80941Smrgstatic void 98b8e80941Smrgremove_from_atexit_list(struct util_queue *queue) 99b8e80941Smrg{ 100b8e80941Smrg struct util_queue *iter, *tmp; 101b8e80941Smrg 102b8e80941Smrg mtx_lock(&exit_mutex); 103b8e80941Smrg LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) { 104b8e80941Smrg if (iter == queue) { 105b8e80941Smrg LIST_DEL(&iter->head); 106b8e80941Smrg break; 107b8e80941Smrg } 108b8e80941Smrg } 109b8e80941Smrg mtx_unlock(&exit_mutex); 110b8e80941Smrg} 111b8e80941Smrg 112b8e80941Smrg/**************************************************************************** 113b8e80941Smrg * util_queue_fence 114b8e80941Smrg */ 115b8e80941Smrg 116b8e80941Smrg#ifdef UTIL_QUEUE_FENCE_FUTEX 117b8e80941Smrgstatic bool 118b8e80941Smrgdo_futex_fence_wait(struct util_queue_fence *fence, 119b8e80941Smrg bool timeout, int64_t abs_timeout) 120b8e80941Smrg{ 121b8e80941Smrg uint32_t v = fence->val; 122b8e80941Smrg struct timespec ts; 123b8e80941Smrg ts.tv_sec = abs_timeout / (1000*1000*1000); 124b8e80941Smrg ts.tv_nsec = abs_timeout % (1000*1000*1000); 125b8e80941Smrg 126b8e80941Smrg while (v != 0) { 127b8e80941Smrg if (v != 2) { 128b8e80941Smrg v = p_atomic_cmpxchg(&fence->val, 1, 2); 129b8e80941Smrg if (v == 0) 130b8e80941Smrg return true; 131b8e80941Smrg } 132b8e80941Smrg 133b8e80941Smrg int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL); 134b8e80941Smrg if (timeout && r < 0) { 135b8e80941Smrg if (errno == ETIMEDOUT) 136b8e80941Smrg return false; 137b8e80941Smrg } 138b8e80941Smrg 139b8e80941Smrg v = fence->val; 140b8e80941Smrg } 141b8e80941Smrg 142b8e80941Smrg return true; 143b8e80941Smrg} 144b8e80941Smrg 145b8e80941Smrgvoid 146b8e80941Smrg_util_queue_fence_wait(struct util_queue_fence *fence) 147b8e80941Smrg{ 148b8e80941Smrg do_futex_fence_wait(fence, false, 0); 149b8e80941Smrg} 150b8e80941Smrg 151b8e80941Smrgbool 152b8e80941Smrg_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 153b8e80941Smrg int64_t abs_timeout) 154b8e80941Smrg{ 155b8e80941Smrg return do_futex_fence_wait(fence, true, abs_timeout); 156b8e80941Smrg} 157b8e80941Smrg 158b8e80941Smrg#endif 159b8e80941Smrg 160b8e80941Smrg#ifdef UTIL_QUEUE_FENCE_STANDARD 161b8e80941Smrgvoid 162b8e80941Smrgutil_queue_fence_signal(struct util_queue_fence *fence) 163b8e80941Smrg{ 164b8e80941Smrg mtx_lock(&fence->mutex); 165b8e80941Smrg fence->signalled = true; 166b8e80941Smrg cnd_broadcast(&fence->cond); 167b8e80941Smrg mtx_unlock(&fence->mutex); 168b8e80941Smrg} 169b8e80941Smrg 170b8e80941Smrgvoid 171b8e80941Smrg_util_queue_fence_wait(struct util_queue_fence *fence) 172b8e80941Smrg{ 173b8e80941Smrg mtx_lock(&fence->mutex); 174b8e80941Smrg while (!fence->signalled) 175b8e80941Smrg cnd_wait(&fence->cond, &fence->mutex); 176b8e80941Smrg mtx_unlock(&fence->mutex); 177b8e80941Smrg} 178b8e80941Smrg 179b8e80941Smrgbool 180b8e80941Smrg_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 181b8e80941Smrg int64_t abs_timeout) 182b8e80941Smrg{ 183b8e80941Smrg /* This terrible hack is made necessary by the fact that we really want an 184b8e80941Smrg * internal interface consistent with os_time_*, but cnd_timedwait is spec'd 185b8e80941Smrg * to be relative to the TIME_UTC clock. 186b8e80941Smrg */ 187b8e80941Smrg int64_t rel = abs_timeout - os_time_get_nano(); 188b8e80941Smrg 189b8e80941Smrg if (rel > 0) { 190b8e80941Smrg struct timespec ts; 191b8e80941Smrg 192b8e80941Smrg timespec_get(&ts, TIME_UTC); 193b8e80941Smrg 194b8e80941Smrg ts.tv_sec += abs_timeout / (1000*1000*1000); 195b8e80941Smrg ts.tv_nsec += abs_timeout % (1000*1000*1000); 196b8e80941Smrg if (ts.tv_nsec >= (1000*1000*1000)) { 197b8e80941Smrg ts.tv_sec++; 198b8e80941Smrg ts.tv_nsec -= (1000*1000*1000); 199b8e80941Smrg } 200b8e80941Smrg 201b8e80941Smrg mtx_lock(&fence->mutex); 202b8e80941Smrg while (!fence->signalled) { 203b8e80941Smrg if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success) 204b8e80941Smrg break; 205b8e80941Smrg } 206b8e80941Smrg mtx_unlock(&fence->mutex); 207b8e80941Smrg } 208b8e80941Smrg 209b8e80941Smrg return fence->signalled; 210b8e80941Smrg} 211b8e80941Smrg 212b8e80941Smrgvoid 213b8e80941Smrgutil_queue_fence_init(struct util_queue_fence *fence) 214b8e80941Smrg{ 215b8e80941Smrg memset(fence, 0, sizeof(*fence)); 216b8e80941Smrg (void) mtx_init(&fence->mutex, mtx_plain); 217b8e80941Smrg cnd_init(&fence->cond); 218b8e80941Smrg fence->signalled = true; 219b8e80941Smrg} 220b8e80941Smrg 221b8e80941Smrgvoid 222b8e80941Smrgutil_queue_fence_destroy(struct util_queue_fence *fence) 223b8e80941Smrg{ 224b8e80941Smrg assert(fence->signalled); 225b8e80941Smrg 226b8e80941Smrg /* Ensure that another thread is not in the middle of 227b8e80941Smrg * util_queue_fence_signal (having set the fence to signalled but still 228b8e80941Smrg * holding the fence mutex). 229b8e80941Smrg * 230b8e80941Smrg * A common contract between threads is that as soon as a fence is signalled 231b8e80941Smrg * by thread A, thread B is allowed to destroy it. Since 232b8e80941Smrg * util_queue_fence_is_signalled does not lock the fence mutex (for 233b8e80941Smrg * performance reasons), we must do so here. 234b8e80941Smrg */ 235b8e80941Smrg mtx_lock(&fence->mutex); 236b8e80941Smrg mtx_unlock(&fence->mutex); 237b8e80941Smrg 238b8e80941Smrg cnd_destroy(&fence->cond); 239b8e80941Smrg mtx_destroy(&fence->mutex); 240b8e80941Smrg} 241b8e80941Smrg#endif 242b8e80941Smrg 243b8e80941Smrg/**************************************************************************** 244b8e80941Smrg * util_queue implementation 245b8e80941Smrg */ 246b8e80941Smrg 247b8e80941Smrgstruct thread_input { 248b8e80941Smrg struct util_queue *queue; 249b8e80941Smrg int thread_index; 250b8e80941Smrg}; 251b8e80941Smrg 252b8e80941Smrgstatic int 253b8e80941Smrgutil_queue_thread_func(void *input) 254b8e80941Smrg{ 255b8e80941Smrg struct util_queue *queue = ((struct thread_input*)input)->queue; 256b8e80941Smrg int thread_index = ((struct thread_input*)input)->thread_index; 257b8e80941Smrg 258b8e80941Smrg free(input); 259b8e80941Smrg 260b8e80941Smrg#ifdef HAVE_PTHREAD_SETAFFINITY 261b8e80941Smrg if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) { 262b8e80941Smrg /* Don't inherit the thread affinity from the parent thread. 263b8e80941Smrg * Set the full mask. 264b8e80941Smrg */ 265b8e80941Smrg#if defined(__NetBSD__) 266b8e80941Smrg cpuset_t *cpuset; 267b8e80941Smrg cpuset = cpuset_create(); 268b8e80941Smrg if (cpuset != NULL) { 269b8e80941Smrg cpuset_zero(cpuset); 270b8e80941Smrg for (unsigned i = 0; i < cpuset_size(cpuset); i++) 271b8e80941Smrg cpuset_set(i, cpuset); 272b8e80941Smrg 273b8e80941Smrg pthread_setaffinity_np(pthread_self(), cpuset_size(cpuset), cpuset); 274b8e80941Smrg cpuset_destroy(cpuset); 275b8e80941Smrg } 276b8e80941Smrg#else 277b8e80941Smrg cpu_set_t cpuset; 278b8e80941Smrg CPU_ZERO(&cpuset); 279b8e80941Smrg for (unsigned i = 0; i < CPU_SETSIZE; i++) 280b8e80941Smrg CPU_SET(i, &cpuset); 281b8e80941Smrg 282b8e80941Smrg pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); 283b8e80941Smrg#endif 284b8e80941Smrg } 285b8e80941Smrg#endif 286b8e80941Smrg 287b8e80941Smrg if (strlen(queue->name) > 0) { 288b8e80941Smrg char name[16]; 289b8e80941Smrg util_snprintf(name, sizeof(name), "%s%i", queue->name, thread_index); 290b8e80941Smrg u_thread_setname(name); 291b8e80941Smrg } 292b8e80941Smrg 293b8e80941Smrg while (1) { 294b8e80941Smrg struct util_queue_job job; 295b8e80941Smrg 296b8e80941Smrg mtx_lock(&queue->lock); 297b8e80941Smrg assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 298b8e80941Smrg 299b8e80941Smrg /* wait if the queue is empty */ 300b8e80941Smrg while (thread_index < queue->num_threads && queue->num_queued == 0) 301b8e80941Smrg cnd_wait(&queue->has_queued_cond, &queue->lock); 302b8e80941Smrg 303b8e80941Smrg /* only kill threads that are above "num_threads" */ 304b8e80941Smrg if (thread_index >= queue->num_threads) { 305b8e80941Smrg mtx_unlock(&queue->lock); 306b8e80941Smrg break; 307b8e80941Smrg } 308b8e80941Smrg 309b8e80941Smrg job = queue->jobs[queue->read_idx]; 310b8e80941Smrg memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); 311b8e80941Smrg queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; 312b8e80941Smrg 313b8e80941Smrg queue->num_queued--; 314b8e80941Smrg cnd_signal(&queue->has_space_cond); 315b8e80941Smrg mtx_unlock(&queue->lock); 316b8e80941Smrg 317b8e80941Smrg if (job.job) { 318b8e80941Smrg job.execute(job.job, thread_index); 319b8e80941Smrg util_queue_fence_signal(job.fence); 320b8e80941Smrg if (job.cleanup) 321b8e80941Smrg job.cleanup(job.job, thread_index); 322b8e80941Smrg } 323b8e80941Smrg } 324b8e80941Smrg 325b8e80941Smrg /* signal remaining jobs if all threads are being terminated */ 326b8e80941Smrg mtx_lock(&queue->lock); 327b8e80941Smrg if (queue->num_threads == 0) { 328b8e80941Smrg for (unsigned i = queue->read_idx; i != queue->write_idx; 329b8e80941Smrg i = (i + 1) % queue->max_jobs) { 330b8e80941Smrg if (queue->jobs[i].job) { 331b8e80941Smrg util_queue_fence_signal(queue->jobs[i].fence); 332b8e80941Smrg queue->jobs[i].job = NULL; 333b8e80941Smrg } 334b8e80941Smrg } 335b8e80941Smrg queue->read_idx = queue->write_idx; 336b8e80941Smrg queue->num_queued = 0; 337b8e80941Smrg } 338b8e80941Smrg mtx_unlock(&queue->lock); 339b8e80941Smrg return 0; 340b8e80941Smrg} 341b8e80941Smrg 342b8e80941Smrgstatic bool 343b8e80941Smrgutil_queue_create_thread(struct util_queue *queue, unsigned index) 344b8e80941Smrg{ 345b8e80941Smrg struct thread_input *input = 346b8e80941Smrg (struct thread_input *) malloc(sizeof(struct thread_input)); 347b8e80941Smrg input->queue = queue; 348b8e80941Smrg input->thread_index = index; 349b8e80941Smrg 350b8e80941Smrg queue->threads[index] = u_thread_create(util_queue_thread_func, input); 351b8e80941Smrg 352b8e80941Smrg if (!queue->threads[index]) { 353b8e80941Smrg free(input); 354b8e80941Smrg return false; 355b8e80941Smrg } 356b8e80941Smrg 357b8e80941Smrg if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { 358b8e80941Smrg#if defined(__linux__) && defined(SCHED_IDLE) 359b8e80941Smrg struct sched_param sched_param = {0}; 360b8e80941Smrg 361b8e80941Smrg /* The nice() function can only set a maximum of 19. 362b8e80941Smrg * SCHED_IDLE is the same as nice = 20. 363b8e80941Smrg * 364b8e80941Smrg * Note that Linux only allows decreasing the priority. The original 365b8e80941Smrg * priority can't be restored. 366b8e80941Smrg */ 367b8e80941Smrg pthread_setschedparam(queue->threads[index], SCHED_IDLE, &sched_param); 368b8e80941Smrg#endif 369b8e80941Smrg } 370b8e80941Smrg return true; 371b8e80941Smrg} 372b8e80941Smrg 373b8e80941Smrgvoid 374b8e80941Smrgutil_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads) 375b8e80941Smrg{ 376b8e80941Smrg num_threads = MIN2(num_threads, queue->max_threads); 377b8e80941Smrg num_threads = MAX2(num_threads, 1); 378b8e80941Smrg 379b8e80941Smrg mtx_lock(&queue->finish_lock); 380b8e80941Smrg unsigned old_num_threads = queue->num_threads; 381b8e80941Smrg 382b8e80941Smrg if (num_threads == old_num_threads) { 383b8e80941Smrg mtx_unlock(&queue->finish_lock); 384b8e80941Smrg return; 385b8e80941Smrg } 386b8e80941Smrg 387b8e80941Smrg if (num_threads < old_num_threads) { 388b8e80941Smrg util_queue_kill_threads(queue, num_threads, true); 389b8e80941Smrg mtx_unlock(&queue->finish_lock); 390b8e80941Smrg return; 391b8e80941Smrg } 392b8e80941Smrg 393b8e80941Smrg /* Create threads. 394b8e80941Smrg * 395b8e80941Smrg * We need to update num_threads first, because threads terminate 396b8e80941Smrg * when thread_index < num_threads. 397b8e80941Smrg */ 398b8e80941Smrg queue->num_threads = num_threads; 399b8e80941Smrg for (unsigned i = old_num_threads; i < num_threads; i++) { 400b8e80941Smrg if (!util_queue_create_thread(queue, i)) 401b8e80941Smrg break; 402b8e80941Smrg } 403b8e80941Smrg mtx_unlock(&queue->finish_lock); 404b8e80941Smrg} 405b8e80941Smrg 406b8e80941Smrgbool 407b8e80941Smrgutil_queue_init(struct util_queue *queue, 408b8e80941Smrg const char *name, 409b8e80941Smrg unsigned max_jobs, 410b8e80941Smrg unsigned num_threads, 411b8e80941Smrg unsigned flags) 412b8e80941Smrg{ 413b8e80941Smrg unsigned i; 414b8e80941Smrg 415b8e80941Smrg /* Form the thread name from process_name and name, limited to 13 416b8e80941Smrg * characters. Characters 14-15 are reserved for the thread number. 417b8e80941Smrg * Character 16 should be 0. Final form: "process:name12" 418b8e80941Smrg * 419b8e80941Smrg * If name is too long, it's truncated. If any space is left, the process 420b8e80941Smrg * name fills it. 421b8e80941Smrg */ 422b8e80941Smrg const char *process_name = util_get_process_name(); 423b8e80941Smrg int process_len = process_name ? strlen(process_name) : 0; 424b8e80941Smrg int name_len = strlen(name); 425b8e80941Smrg const int max_chars = sizeof(queue->name) - 1; 426b8e80941Smrg 427b8e80941Smrg name_len = MIN2(name_len, max_chars); 428b8e80941Smrg 429b8e80941Smrg /* See if there is any space left for the process name, reserve 1 for 430b8e80941Smrg * the colon. */ 431b8e80941Smrg process_len = MIN2(process_len, max_chars - name_len - 1); 432b8e80941Smrg process_len = MAX2(process_len, 0); 433b8e80941Smrg 434b8e80941Smrg memset(queue, 0, sizeof(*queue)); 435b8e80941Smrg 436b8e80941Smrg if (process_len) { 437b8e80941Smrg util_snprintf(queue->name, sizeof(queue->name), "%.*s:%s", 438b8e80941Smrg process_len, process_name, name); 439b8e80941Smrg } else { 440b8e80941Smrg util_snprintf(queue->name, sizeof(queue->name), "%s", name); 441b8e80941Smrg } 442b8e80941Smrg 443b8e80941Smrg queue->flags = flags; 444b8e80941Smrg queue->max_threads = num_threads; 445b8e80941Smrg queue->num_threads = num_threads; 446b8e80941Smrg queue->max_jobs = max_jobs; 447b8e80941Smrg 448b8e80941Smrg queue->jobs = (struct util_queue_job*) 449b8e80941Smrg calloc(max_jobs, sizeof(struct util_queue_job)); 450b8e80941Smrg if (!queue->jobs) 451b8e80941Smrg goto fail; 452b8e80941Smrg 453b8e80941Smrg (void) mtx_init(&queue->lock, mtx_plain); 454b8e80941Smrg (void) mtx_init(&queue->finish_lock, mtx_plain); 455b8e80941Smrg 456b8e80941Smrg queue->num_queued = 0; 457b8e80941Smrg cnd_init(&queue->has_queued_cond); 458b8e80941Smrg cnd_init(&queue->has_space_cond); 459b8e80941Smrg 460b8e80941Smrg queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t)); 461b8e80941Smrg if (!queue->threads) 462b8e80941Smrg goto fail; 463b8e80941Smrg 464b8e80941Smrg /* start threads */ 465b8e80941Smrg for (i = 0; i < num_threads; i++) { 466b8e80941Smrg if (!util_queue_create_thread(queue, i)) { 467b8e80941Smrg if (i == 0) { 468b8e80941Smrg /* no threads created, fail */ 469b8e80941Smrg goto fail; 470b8e80941Smrg } else { 471b8e80941Smrg /* at least one thread created, so use it */ 472b8e80941Smrg queue->num_threads = i; 473b8e80941Smrg break; 474b8e80941Smrg } 475b8e80941Smrg } 476b8e80941Smrg } 477b8e80941Smrg 478b8e80941Smrg add_to_atexit_list(queue); 479b8e80941Smrg return true; 480b8e80941Smrg 481b8e80941Smrgfail: 482b8e80941Smrg free(queue->threads); 483b8e80941Smrg 484b8e80941Smrg if (queue->jobs) { 485b8e80941Smrg cnd_destroy(&queue->has_space_cond); 486b8e80941Smrg cnd_destroy(&queue->has_queued_cond); 487b8e80941Smrg mtx_destroy(&queue->lock); 488b8e80941Smrg free(queue->jobs); 489b8e80941Smrg } 490b8e80941Smrg /* also util_queue_is_initialized can be used to check for success */ 491b8e80941Smrg memset(queue, 0, sizeof(*queue)); 492b8e80941Smrg return false; 493b8e80941Smrg} 494b8e80941Smrg 495b8e80941Smrgstatic void 496b8e80941Smrgutil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 497b8e80941Smrg bool finish_locked) 498b8e80941Smrg{ 499b8e80941Smrg unsigned i; 500b8e80941Smrg 501b8e80941Smrg /* Signal all threads to terminate. */ 502b8e80941Smrg if (!finish_locked) 503b8e80941Smrg mtx_lock(&queue->finish_lock); 504b8e80941Smrg 505b8e80941Smrg if (keep_num_threads >= queue->num_threads) { 506b8e80941Smrg mtx_unlock(&queue->finish_lock); 507b8e80941Smrg return; 508b8e80941Smrg } 509b8e80941Smrg 510b8e80941Smrg mtx_lock(&queue->lock); 511b8e80941Smrg unsigned old_num_threads = queue->num_threads; 512b8e80941Smrg /* Setting num_threads is what causes the threads to terminate. 513b8e80941Smrg * Then cnd_broadcast wakes them up and they will exit their function. 514b8e80941Smrg */ 515b8e80941Smrg queue->num_threads = keep_num_threads; 516b8e80941Smrg cnd_broadcast(&queue->has_queued_cond); 517b8e80941Smrg mtx_unlock(&queue->lock); 518b8e80941Smrg 519b8e80941Smrg for (i = keep_num_threads; i < old_num_threads; i++) 520b8e80941Smrg thrd_join(queue->threads[i], NULL); 521b8e80941Smrg 522b8e80941Smrg if (!finish_locked) 523b8e80941Smrg mtx_unlock(&queue->finish_lock); 524b8e80941Smrg} 525b8e80941Smrg 526b8e80941Smrgvoid 527b8e80941Smrgutil_queue_destroy(struct util_queue *queue) 528b8e80941Smrg{ 529b8e80941Smrg util_queue_kill_threads(queue, 0, false); 530b8e80941Smrg remove_from_atexit_list(queue); 531b8e80941Smrg 532b8e80941Smrg cnd_destroy(&queue->has_space_cond); 533b8e80941Smrg cnd_destroy(&queue->has_queued_cond); 534b8e80941Smrg mtx_destroy(&queue->finish_lock); 535b8e80941Smrg mtx_destroy(&queue->lock); 536b8e80941Smrg free(queue->jobs); 537b8e80941Smrg free(queue->threads); 538b8e80941Smrg} 539b8e80941Smrg 540b8e80941Smrgvoid 541b8e80941Smrgutil_queue_add_job(struct util_queue *queue, 542b8e80941Smrg void *job, 543b8e80941Smrg struct util_queue_fence *fence, 544b8e80941Smrg util_queue_execute_func execute, 545b8e80941Smrg util_queue_execute_func cleanup) 546b8e80941Smrg{ 547b8e80941Smrg struct util_queue_job *ptr; 548b8e80941Smrg 549b8e80941Smrg mtx_lock(&queue->lock); 550b8e80941Smrg if (queue->num_threads == 0) { 551b8e80941Smrg mtx_unlock(&queue->lock); 552b8e80941Smrg /* well no good option here, but any leaks will be 553b8e80941Smrg * short-lived as things are shutting down.. 554b8e80941Smrg */ 555b8e80941Smrg return; 556b8e80941Smrg } 557b8e80941Smrg 558b8e80941Smrg util_queue_fence_reset(fence); 559b8e80941Smrg 560b8e80941Smrg assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 561b8e80941Smrg 562b8e80941Smrg if (queue->num_queued == queue->max_jobs) { 563b8e80941Smrg if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) { 564b8e80941Smrg /* If the queue is full, make it larger to avoid waiting for a free 565b8e80941Smrg * slot. 566b8e80941Smrg */ 567b8e80941Smrg unsigned new_max_jobs = queue->max_jobs + 8; 568b8e80941Smrg struct util_queue_job *jobs = 569b8e80941Smrg (struct util_queue_job*)calloc(new_max_jobs, 570b8e80941Smrg sizeof(struct util_queue_job)); 571b8e80941Smrg assert(jobs); 572b8e80941Smrg 573b8e80941Smrg /* Copy all queued jobs into the new list. */ 574b8e80941Smrg unsigned num_jobs = 0; 575b8e80941Smrg unsigned i = queue->read_idx; 576b8e80941Smrg 577b8e80941Smrg do { 578b8e80941Smrg jobs[num_jobs++] = queue->jobs[i]; 579b8e80941Smrg i = (i + 1) % queue->max_jobs; 580b8e80941Smrg } while (i != queue->write_idx); 581b8e80941Smrg 582b8e80941Smrg assert(num_jobs == queue->num_queued); 583b8e80941Smrg 584b8e80941Smrg free(queue->jobs); 585b8e80941Smrg queue->jobs = jobs; 586b8e80941Smrg queue->read_idx = 0; 587b8e80941Smrg queue->write_idx = num_jobs; 588b8e80941Smrg queue->max_jobs = new_max_jobs; 589b8e80941Smrg } else { 590b8e80941Smrg /* Wait until there is a free slot. */ 591b8e80941Smrg while (queue->num_queued == queue->max_jobs) 592b8e80941Smrg cnd_wait(&queue->has_space_cond, &queue->lock); 593b8e80941Smrg } 594b8e80941Smrg } 595b8e80941Smrg 596b8e80941Smrg ptr = &queue->jobs[queue->write_idx]; 597b8e80941Smrg assert(ptr->job == NULL); 598b8e80941Smrg ptr->job = job; 599b8e80941Smrg ptr->fence = fence; 600b8e80941Smrg ptr->execute = execute; 601b8e80941Smrg ptr->cleanup = cleanup; 602b8e80941Smrg queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; 603b8e80941Smrg 604b8e80941Smrg queue->num_queued++; 605b8e80941Smrg cnd_signal(&queue->has_queued_cond); 606b8e80941Smrg mtx_unlock(&queue->lock); 607b8e80941Smrg} 608b8e80941Smrg 609b8e80941Smrg/** 610b8e80941Smrg * Remove a queued job. If the job hasn't started execution, it's removed from 611b8e80941Smrg * the queue. If the job has started execution, the function waits for it to 612b8e80941Smrg * complete. 613b8e80941Smrg * 614b8e80941Smrg * In all cases, the fence is signalled when the function returns. 615b8e80941Smrg * 616b8e80941Smrg * The function can be used when destroying an object associated with the job 617b8e80941Smrg * when you don't care about the job completion state. 618b8e80941Smrg */ 619b8e80941Smrgvoid 620b8e80941Smrgutil_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) 621b8e80941Smrg{ 622b8e80941Smrg bool removed = false; 623b8e80941Smrg 624b8e80941Smrg if (util_queue_fence_is_signalled(fence)) 625b8e80941Smrg return; 626b8e80941Smrg 627b8e80941Smrg mtx_lock(&queue->lock); 628b8e80941Smrg for (unsigned i = queue->read_idx; i != queue->write_idx; 629b8e80941Smrg i = (i + 1) % queue->max_jobs) { 630b8e80941Smrg if (queue->jobs[i].fence == fence) { 631b8e80941Smrg if (queue->jobs[i].cleanup) 632b8e80941Smrg queue->jobs[i].cleanup(queue->jobs[i].job, -1); 633b8e80941Smrg 634b8e80941Smrg /* Just clear it. The threads will treat as a no-op job. */ 635b8e80941Smrg memset(&queue->jobs[i], 0, sizeof(queue->jobs[i])); 636b8e80941Smrg removed = true; 637b8e80941Smrg break; 638b8e80941Smrg } 639b8e80941Smrg } 640b8e80941Smrg mtx_unlock(&queue->lock); 641b8e80941Smrg 642b8e80941Smrg if (removed) 643b8e80941Smrg util_queue_fence_signal(fence); 644b8e80941Smrg else 645b8e80941Smrg util_queue_fence_wait(fence); 646b8e80941Smrg} 647b8e80941Smrg 648b8e80941Smrgstatic void 649b8e80941Smrgutil_queue_finish_execute(void *data, int num_thread) 650b8e80941Smrg{ 651b8e80941Smrg util_barrier *barrier = data; 652b8e80941Smrg util_barrier_wait(barrier); 653b8e80941Smrg} 654b8e80941Smrg 655b8e80941Smrg/** 656b8e80941Smrg * Wait until all previously added jobs have completed. 657b8e80941Smrg */ 658b8e80941Smrgvoid 659b8e80941Smrgutil_queue_finish(struct util_queue *queue) 660b8e80941Smrg{ 661b8e80941Smrg util_barrier barrier; 662b8e80941Smrg struct util_queue_fence *fences; 663b8e80941Smrg 664b8e80941Smrg /* If 2 threads were adding jobs for 2 different barries at the same time, 665b8e80941Smrg * a deadlock would happen, because 1 barrier requires that all threads 666b8e80941Smrg * wait for it exclusively. 667b8e80941Smrg */ 668b8e80941Smrg mtx_lock(&queue->finish_lock); 669b8e80941Smrg fences = malloc(queue->num_threads * sizeof(*fences)); 670b8e80941Smrg util_barrier_init(&barrier, queue->num_threads); 671b8e80941Smrg 672b8e80941Smrg for (unsigned i = 0; i < queue->num_threads; ++i) { 673b8e80941Smrg util_queue_fence_init(&fences[i]); 674b8e80941Smrg util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL); 675b8e80941Smrg } 676b8e80941Smrg 677b8e80941Smrg for (unsigned i = 0; i < queue->num_threads; ++i) { 678b8e80941Smrg util_queue_fence_wait(&fences[i]); 679b8e80941Smrg util_queue_fence_destroy(&fences[i]); 680b8e80941Smrg } 681b8e80941Smrg mtx_unlock(&queue->finish_lock); 682b8e80941Smrg 683b8e80941Smrg util_barrier_destroy(&barrier); 684b8e80941Smrg 685b8e80941Smrg free(fences); 686b8e80941Smrg} 687b8e80941Smrg 688b8e80941Smrgint64_t 689b8e80941Smrgutil_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) 690b8e80941Smrg{ 691b8e80941Smrg /* Allow some flexibility by not raising an error. */ 692b8e80941Smrg if (thread_index >= queue->num_threads) 693b8e80941Smrg return 0; 694b8e80941Smrg 695b8e80941Smrg return u_thread_get_time_nano(queue->threads[thread_index]); 696b8e80941Smrg} 697