1/* 2 * Copyright © 2016 Advanced Micro Devices, Inc. 3 * All Rights Reserved. 4 * 5 * Permission is hereby granted, free of charge, to any person obtaining 6 * a copy of this software and associated documentation files (the 7 * "Software"), to deal in the Software without restriction, including 8 * without limitation the rights to use, copy, modify, merge, publish, 9 * distribute, sub license, and/or sell copies of the Software, and to 10 * permit persons to whom the Software is furnished to do so, subject to 11 * the following conditions: 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS 17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20 * USE OR OTHER DEALINGS IN THE SOFTWARE. 21 * 22 * The above copyright notice and this permission notice (including the 23 * next paragraph) shall be included in all copies or substantial portions 24 * of the Software. 25 */ 26 27#include "u_queue.h" 28 29#include <time.h> 30 31#include "util/os_time.h" 32#include "util/u_string.h" 33#include "util/u_thread.h" 34#include "u_process.h" 35 36static void 37util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 38 bool finish_locked); 39 40/**************************************************************************** 41 * Wait for all queues to assert idle when exit() is called. 42 * 43 * Otherwise, C++ static variable destructors can be called while threads 44 * are using the static variables. 45 */ 46 47static once_flag atexit_once_flag = ONCE_FLAG_INIT; 48static struct list_head queue_list; 49static mtx_t exit_mutex = _MTX_INITIALIZER_NP; 50 51#define HAVE_NOATEXIT 52#if defined(HAVE_NOATEXIT) 53static int global_init_called = 0; 54 55static void __attribute__((__destructor__)) 56#else 57static void 58#endif 59atexit_handler(void) 60{ 61 struct util_queue *iter; 62 63#if defined(HAVE_NOATEXIT) 64 if (!global_init_called) 65 return; 66#endif 67 68 mtx_lock(&exit_mutex); 69 /* Wait for all queues to assert idle. */ 70 LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { 71 util_queue_kill_threads(iter, 0, false); 72 } 73 mtx_unlock(&exit_mutex); 74} 75 76static void 77global_init(void) 78{ 79 LIST_INITHEAD(&queue_list); 80#if defined(HAVE_NOATEXIT) 81 global_init_called = 1; 82#else 83 atexit(atexit_handler); 84#endif 85} 86 87static void 88add_to_atexit_list(struct util_queue *queue) 89{ 90 call_once(&atexit_once_flag, global_init); 91 92 mtx_lock(&exit_mutex); 93 LIST_ADD(&queue->head, &queue_list); 94 mtx_unlock(&exit_mutex); 95} 96 97static void 98remove_from_atexit_list(struct util_queue *queue) 99{ 100 struct util_queue *iter, *tmp; 101 102 mtx_lock(&exit_mutex); 103 LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) { 104 if (iter == queue) { 105 LIST_DEL(&iter->head); 106 break; 107 } 108 } 109 mtx_unlock(&exit_mutex); 110} 111 112/**************************************************************************** 113 * util_queue_fence 114 */ 115 116#ifdef UTIL_QUEUE_FENCE_FUTEX 117static bool 118do_futex_fence_wait(struct util_queue_fence *fence, 119 bool timeout, int64_t abs_timeout) 120{ 121 uint32_t v = fence->val; 122 struct timespec ts; 123 ts.tv_sec = abs_timeout / (1000*1000*1000); 124 ts.tv_nsec = abs_timeout % (1000*1000*1000); 125 126 while (v != 0) { 127 if (v != 2) { 128 v = p_atomic_cmpxchg(&fence->val, 1, 2); 129 if (v == 0) 130 return true; 131 } 132 133 int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL); 134 if (timeout && r < 0) { 135 if (errno == ETIMEDOUT) 136 return false; 137 } 138 139 v = fence->val; 140 } 141 142 return true; 143} 144 145void 146_util_queue_fence_wait(struct util_queue_fence *fence) 147{ 148 do_futex_fence_wait(fence, false, 0); 149} 150 151bool 152_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 153 int64_t abs_timeout) 154{ 155 return do_futex_fence_wait(fence, true, abs_timeout); 156} 157 158#endif 159 160#ifdef UTIL_QUEUE_FENCE_STANDARD 161void 162util_queue_fence_signal(struct util_queue_fence *fence) 163{ 164 mtx_lock(&fence->mutex); 165 fence->signalled = true; 166 cnd_broadcast(&fence->cond); 167 mtx_unlock(&fence->mutex); 168} 169 170void 171_util_queue_fence_wait(struct util_queue_fence *fence) 172{ 173 mtx_lock(&fence->mutex); 174 while (!fence->signalled) 175 cnd_wait(&fence->cond, &fence->mutex); 176 mtx_unlock(&fence->mutex); 177} 178 179bool 180_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 181 int64_t abs_timeout) 182{ 183 /* This terrible hack is made necessary by the fact that we really want an 184 * internal interface consistent with os_time_*, but cnd_timedwait is spec'd 185 * to be relative to the TIME_UTC clock. 186 */ 187 int64_t rel = abs_timeout - os_time_get_nano(); 188 189 if (rel > 0) { 190 struct timespec ts; 191 192 timespec_get(&ts, TIME_UTC); 193 194 ts.tv_sec += abs_timeout / (1000*1000*1000); 195 ts.tv_nsec += abs_timeout % (1000*1000*1000); 196 if (ts.tv_nsec >= (1000*1000*1000)) { 197 ts.tv_sec++; 198 ts.tv_nsec -= (1000*1000*1000); 199 } 200 201 mtx_lock(&fence->mutex); 202 while (!fence->signalled) { 203 if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success) 204 break; 205 } 206 mtx_unlock(&fence->mutex); 207 } 208 209 return fence->signalled; 210} 211 212void 213util_queue_fence_init(struct util_queue_fence *fence) 214{ 215 memset(fence, 0, sizeof(*fence)); 216 (void) mtx_init(&fence->mutex, mtx_plain); 217 cnd_init(&fence->cond); 218 fence->signalled = true; 219} 220 221void 222util_queue_fence_destroy(struct util_queue_fence *fence) 223{ 224 assert(fence->signalled); 225 226 /* Ensure that another thread is not in the middle of 227 * util_queue_fence_signal (having set the fence to signalled but still 228 * holding the fence mutex). 229 * 230 * A common contract between threads is that as soon as a fence is signalled 231 * by thread A, thread B is allowed to destroy it. Since 232 * util_queue_fence_is_signalled does not lock the fence mutex (for 233 * performance reasons), we must do so here. 234 */ 235 mtx_lock(&fence->mutex); 236 mtx_unlock(&fence->mutex); 237 238 cnd_destroy(&fence->cond); 239 mtx_destroy(&fence->mutex); 240} 241#endif 242 243/**************************************************************************** 244 * util_queue implementation 245 */ 246 247struct thread_input { 248 struct util_queue *queue; 249 int thread_index; 250}; 251 252static int 253util_queue_thread_func(void *input) 254{ 255 struct util_queue *queue = ((struct thread_input*)input)->queue; 256 int thread_index = ((struct thread_input*)input)->thread_index; 257 258 free(input); 259 260#ifdef HAVE_PTHREAD_SETAFFINITY 261 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) { 262 /* Don't inherit the thread affinity from the parent thread. 263 * Set the full mask. 264 */ 265#if defined(__NetBSD__) 266 cpuset_t *cpuset; 267 cpuset = cpuset_create(); 268 if (cpuset != NULL) { 269 cpuset_zero(cpuset); 270 for (unsigned i = 0; i < cpuset_size(cpuset); i++) 271 cpuset_set(i, cpuset); 272 273 pthread_setaffinity_np(pthread_self(), cpuset_size(cpuset), cpuset); 274 cpuset_destroy(cpuset); 275 } 276#else 277 cpu_set_t cpuset; 278 CPU_ZERO(&cpuset); 279 for (unsigned i = 0; i < CPU_SETSIZE; i++) 280 CPU_SET(i, &cpuset); 281 282 pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); 283#endif 284 } 285#endif 286 287 if (strlen(queue->name) > 0) { 288 char name[16]; 289 util_snprintf(name, sizeof(name), "%s%i", queue->name, thread_index); 290 u_thread_setname(name); 291 } 292 293 while (1) { 294 struct util_queue_job job; 295 296 mtx_lock(&queue->lock); 297 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 298 299 /* wait if the queue is empty */ 300 while (thread_index < queue->num_threads && queue->num_queued == 0) 301 cnd_wait(&queue->has_queued_cond, &queue->lock); 302 303 /* only kill threads that are above "num_threads" */ 304 if (thread_index >= queue->num_threads) { 305 mtx_unlock(&queue->lock); 306 break; 307 } 308 309 job = queue->jobs[queue->read_idx]; 310 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); 311 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; 312 313 queue->num_queued--; 314 cnd_signal(&queue->has_space_cond); 315 mtx_unlock(&queue->lock); 316 317 if (job.job) { 318 job.execute(job.job, thread_index); 319 util_queue_fence_signal(job.fence); 320 if (job.cleanup) 321 job.cleanup(job.job, thread_index); 322 } 323 } 324 325 /* signal remaining jobs if all threads are being terminated */ 326 mtx_lock(&queue->lock); 327 if (queue->num_threads == 0) { 328 for (unsigned i = queue->read_idx; i != queue->write_idx; 329 i = (i + 1) % queue->max_jobs) { 330 if (queue->jobs[i].job) { 331 util_queue_fence_signal(queue->jobs[i].fence); 332 queue->jobs[i].job = NULL; 333 } 334 } 335 queue->read_idx = queue->write_idx; 336 queue->num_queued = 0; 337 } 338 mtx_unlock(&queue->lock); 339 return 0; 340} 341 342static bool 343util_queue_create_thread(struct util_queue *queue, unsigned index) 344{ 345 struct thread_input *input = 346 (struct thread_input *) malloc(sizeof(struct thread_input)); 347 input->queue = queue; 348 input->thread_index = index; 349 350 queue->threads[index] = u_thread_create(util_queue_thread_func, input); 351 352 if (!queue->threads[index]) { 353 free(input); 354 return false; 355 } 356 357 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { 358#if defined(__linux__) && defined(SCHED_IDLE) 359 struct sched_param sched_param = {0}; 360 361 /* The nice() function can only set a maximum of 19. 362 * SCHED_IDLE is the same as nice = 20. 363 * 364 * Note that Linux only allows decreasing the priority. The original 365 * priority can't be restored. 366 */ 367 pthread_setschedparam(queue->threads[index], SCHED_IDLE, &sched_param); 368#endif 369 } 370 return true; 371} 372 373void 374util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads) 375{ 376 num_threads = MIN2(num_threads, queue->max_threads); 377 num_threads = MAX2(num_threads, 1); 378 379 mtx_lock(&queue->finish_lock); 380 unsigned old_num_threads = queue->num_threads; 381 382 if (num_threads == old_num_threads) { 383 mtx_unlock(&queue->finish_lock); 384 return; 385 } 386 387 if (num_threads < old_num_threads) { 388 util_queue_kill_threads(queue, num_threads, true); 389 mtx_unlock(&queue->finish_lock); 390 return; 391 } 392 393 /* Create threads. 394 * 395 * We need to update num_threads first, because threads terminate 396 * when thread_index < num_threads. 397 */ 398 queue->num_threads = num_threads; 399 for (unsigned i = old_num_threads; i < num_threads; i++) { 400 if (!util_queue_create_thread(queue, i)) 401 break; 402 } 403 mtx_unlock(&queue->finish_lock); 404} 405 406bool 407util_queue_init(struct util_queue *queue, 408 const char *name, 409 unsigned max_jobs, 410 unsigned num_threads, 411 unsigned flags) 412{ 413 unsigned i; 414 415 /* Form the thread name from process_name and name, limited to 13 416 * characters. Characters 14-15 are reserved for the thread number. 417 * Character 16 should be 0. Final form: "process:name12" 418 * 419 * If name is too long, it's truncated. If any space is left, the process 420 * name fills it. 421 */ 422 const char *process_name = util_get_process_name(); 423 int process_len = process_name ? strlen(process_name) : 0; 424 int name_len = strlen(name); 425 const int max_chars = sizeof(queue->name) - 1; 426 427 name_len = MIN2(name_len, max_chars); 428 429 /* See if there is any space left for the process name, reserve 1 for 430 * the colon. */ 431 process_len = MIN2(process_len, max_chars - name_len - 1); 432 process_len = MAX2(process_len, 0); 433 434 memset(queue, 0, sizeof(*queue)); 435 436 if (process_len) { 437 util_snprintf(queue->name, sizeof(queue->name), "%.*s:%s", 438 process_len, process_name, name); 439 } else { 440 util_snprintf(queue->name, sizeof(queue->name), "%s", name); 441 } 442 443 queue->flags = flags; 444 queue->max_threads = num_threads; 445 queue->num_threads = num_threads; 446 queue->max_jobs = max_jobs; 447 448 queue->jobs = (struct util_queue_job*) 449 calloc(max_jobs, sizeof(struct util_queue_job)); 450 if (!queue->jobs) 451 goto fail; 452 453 (void) mtx_init(&queue->lock, mtx_plain); 454 (void) mtx_init(&queue->finish_lock, mtx_plain); 455 456 queue->num_queued = 0; 457 cnd_init(&queue->has_queued_cond); 458 cnd_init(&queue->has_space_cond); 459 460 queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t)); 461 if (!queue->threads) 462 goto fail; 463 464 /* start threads */ 465 for (i = 0; i < num_threads; i++) { 466 if (!util_queue_create_thread(queue, i)) { 467 if (i == 0) { 468 /* no threads created, fail */ 469 goto fail; 470 } else { 471 /* at least one thread created, so use it */ 472 queue->num_threads = i; 473 break; 474 } 475 } 476 } 477 478 add_to_atexit_list(queue); 479 return true; 480 481fail: 482 free(queue->threads); 483 484 if (queue->jobs) { 485 cnd_destroy(&queue->has_space_cond); 486 cnd_destroy(&queue->has_queued_cond); 487 mtx_destroy(&queue->lock); 488 free(queue->jobs); 489 } 490 /* also util_queue_is_initialized can be used to check for success */ 491 memset(queue, 0, sizeof(*queue)); 492 return false; 493} 494 495static void 496util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 497 bool finish_locked) 498{ 499 unsigned i; 500 501 /* Signal all threads to terminate. */ 502 if (!finish_locked) 503 mtx_lock(&queue->finish_lock); 504 505 if (keep_num_threads >= queue->num_threads) { 506 mtx_unlock(&queue->finish_lock); 507 return; 508 } 509 510 mtx_lock(&queue->lock); 511 unsigned old_num_threads = queue->num_threads; 512 /* Setting num_threads is what causes the threads to terminate. 513 * Then cnd_broadcast wakes them up and they will exit their function. 514 */ 515 queue->num_threads = keep_num_threads; 516 cnd_broadcast(&queue->has_queued_cond); 517 mtx_unlock(&queue->lock); 518 519 for (i = keep_num_threads; i < old_num_threads; i++) 520 thrd_join(queue->threads[i], NULL); 521 522 if (!finish_locked) 523 mtx_unlock(&queue->finish_lock); 524} 525 526void 527util_queue_destroy(struct util_queue *queue) 528{ 529 util_queue_kill_threads(queue, 0, false); 530 remove_from_atexit_list(queue); 531 532 cnd_destroy(&queue->has_space_cond); 533 cnd_destroy(&queue->has_queued_cond); 534 mtx_destroy(&queue->finish_lock); 535 mtx_destroy(&queue->lock); 536 free(queue->jobs); 537 free(queue->threads); 538} 539 540void 541util_queue_add_job(struct util_queue *queue, 542 void *job, 543 struct util_queue_fence *fence, 544 util_queue_execute_func execute, 545 util_queue_execute_func cleanup) 546{ 547 struct util_queue_job *ptr; 548 549 mtx_lock(&queue->lock); 550 if (queue->num_threads == 0) { 551 mtx_unlock(&queue->lock); 552 /* well no good option here, but any leaks will be 553 * short-lived as things are shutting down.. 554 */ 555 return; 556 } 557 558 util_queue_fence_reset(fence); 559 560 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 561 562 if (queue->num_queued == queue->max_jobs) { 563 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) { 564 /* If the queue is full, make it larger to avoid waiting for a free 565 * slot. 566 */ 567 unsigned new_max_jobs = queue->max_jobs + 8; 568 struct util_queue_job *jobs = 569 (struct util_queue_job*)calloc(new_max_jobs, 570 sizeof(struct util_queue_job)); 571 assert(jobs); 572 573 /* Copy all queued jobs into the new list. */ 574 unsigned num_jobs = 0; 575 unsigned i = queue->read_idx; 576 577 do { 578 jobs[num_jobs++] = queue->jobs[i]; 579 i = (i + 1) % queue->max_jobs; 580 } while (i != queue->write_idx); 581 582 assert(num_jobs == queue->num_queued); 583 584 free(queue->jobs); 585 queue->jobs = jobs; 586 queue->read_idx = 0; 587 queue->write_idx = num_jobs; 588 queue->max_jobs = new_max_jobs; 589 } else { 590 /* Wait until there is a free slot. */ 591 while (queue->num_queued == queue->max_jobs) 592 cnd_wait(&queue->has_space_cond, &queue->lock); 593 } 594 } 595 596 ptr = &queue->jobs[queue->write_idx]; 597 assert(ptr->job == NULL); 598 ptr->job = job; 599 ptr->fence = fence; 600 ptr->execute = execute; 601 ptr->cleanup = cleanup; 602 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; 603 604 queue->num_queued++; 605 cnd_signal(&queue->has_queued_cond); 606 mtx_unlock(&queue->lock); 607} 608 609/** 610 * Remove a queued job. If the job hasn't started execution, it's removed from 611 * the queue. If the job has started execution, the function waits for it to 612 * complete. 613 * 614 * In all cases, the fence is signalled when the function returns. 615 * 616 * The function can be used when destroying an object associated with the job 617 * when you don't care about the job completion state. 618 */ 619void 620util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) 621{ 622 bool removed = false; 623 624 if (util_queue_fence_is_signalled(fence)) 625 return; 626 627 mtx_lock(&queue->lock); 628 for (unsigned i = queue->read_idx; i != queue->write_idx; 629 i = (i + 1) % queue->max_jobs) { 630 if (queue->jobs[i].fence == fence) { 631 if (queue->jobs[i].cleanup) 632 queue->jobs[i].cleanup(queue->jobs[i].job, -1); 633 634 /* Just clear it. The threads will treat as a no-op job. */ 635 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i])); 636 removed = true; 637 break; 638 } 639 } 640 mtx_unlock(&queue->lock); 641 642 if (removed) 643 util_queue_fence_signal(fence); 644 else 645 util_queue_fence_wait(fence); 646} 647 648static void 649util_queue_finish_execute(void *data, int num_thread) 650{ 651 util_barrier *barrier = data; 652 util_barrier_wait(barrier); 653} 654 655/** 656 * Wait until all previously added jobs have completed. 657 */ 658void 659util_queue_finish(struct util_queue *queue) 660{ 661 util_barrier barrier; 662 struct util_queue_fence *fences; 663 664 /* If 2 threads were adding jobs for 2 different barries at the same time, 665 * a deadlock would happen, because 1 barrier requires that all threads 666 * wait for it exclusively. 667 */ 668 mtx_lock(&queue->finish_lock); 669 fences = malloc(queue->num_threads * sizeof(*fences)); 670 util_barrier_init(&barrier, queue->num_threads); 671 672 for (unsigned i = 0; i < queue->num_threads; ++i) { 673 util_queue_fence_init(&fences[i]); 674 util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL); 675 } 676 677 for (unsigned i = 0; i < queue->num_threads; ++i) { 678 util_queue_fence_wait(&fences[i]); 679 util_queue_fence_destroy(&fences[i]); 680 } 681 mtx_unlock(&queue->finish_lock); 682 683 util_barrier_destroy(&barrier); 684 685 free(fences); 686} 687 688int64_t 689util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) 690{ 691 /* Allow some flexibility by not raising an error. */ 692 if (thread_index >= queue->num_threads) 693 return 0; 694 695 return u_thread_get_time_nano(queue->threads[thread_index]); 696} 697