1/*
2 * Copyright © 2016 Advanced Micro Devices, Inc.
3 * All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
24 * of the Software.
25 */
26
27#include "u_queue.h"
28
29#include <time.h>
30
31#include "util/os_time.h"
32#include "util/u_string.h"
33#include "util/u_thread.h"
34#include "u_process.h"
35
36static void
37util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
38                        bool finish_locked);
39
40/****************************************************************************
41 * Wait for all queues to assert idle when exit() is called.
42 *
43 * Otherwise, C++ static variable destructors can be called while threads
44 * are using the static variables.
45 */
46
47static once_flag atexit_once_flag = ONCE_FLAG_INIT;
48static struct list_head queue_list;
49static mtx_t exit_mutex = _MTX_INITIALIZER_NP;
50
51#define HAVE_NOATEXIT
52#if defined(HAVE_NOATEXIT)
53static int global_init_called = 0;
54
55static void __attribute__((__destructor__))
56#else
57static void
58#endif
59atexit_handler(void)
60{
61   struct util_queue *iter;
62
63#if defined(HAVE_NOATEXIT)
64   if (!global_init_called)
65      return;
66#endif
67
68   mtx_lock(&exit_mutex);
69   /* Wait for all queues to assert idle. */
70   LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
71      util_queue_kill_threads(iter, 0, false);
72   }
73   mtx_unlock(&exit_mutex);
74}
75
76static void
77global_init(void)
78{
79   LIST_INITHEAD(&queue_list);
80#if defined(HAVE_NOATEXIT)
81   global_init_called = 1;
82#else
83   atexit(atexit_handler);
84#endif
85}
86
87static void
88add_to_atexit_list(struct util_queue *queue)
89{
90   call_once(&atexit_once_flag, global_init);
91
92   mtx_lock(&exit_mutex);
93   LIST_ADD(&queue->head, &queue_list);
94   mtx_unlock(&exit_mutex);
95}
96
97static void
98remove_from_atexit_list(struct util_queue *queue)
99{
100   struct util_queue *iter, *tmp;
101
102   mtx_lock(&exit_mutex);
103   LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
104      if (iter == queue) {
105         LIST_DEL(&iter->head);
106         break;
107      }
108   }
109   mtx_unlock(&exit_mutex);
110}
111
112/****************************************************************************
113 * util_queue_fence
114 */
115
116#ifdef UTIL_QUEUE_FENCE_FUTEX
117static bool
118do_futex_fence_wait(struct util_queue_fence *fence,
119                    bool timeout, int64_t abs_timeout)
120{
121   uint32_t v = fence->val;
122   struct timespec ts;
123   ts.tv_sec = abs_timeout / (1000*1000*1000);
124   ts.tv_nsec = abs_timeout % (1000*1000*1000);
125
126   while (v != 0) {
127      if (v != 2) {
128         v = p_atomic_cmpxchg(&fence->val, 1, 2);
129         if (v == 0)
130            return true;
131      }
132
133      int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
134      if (timeout && r < 0) {
135         if (errno == ETIMEDOUT)
136            return false;
137      }
138
139      v = fence->val;
140   }
141
142   return true;
143}
144
145void
146_util_queue_fence_wait(struct util_queue_fence *fence)
147{
148   do_futex_fence_wait(fence, false, 0);
149}
150
151bool
152_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
153                               int64_t abs_timeout)
154{
155   return do_futex_fence_wait(fence, true, abs_timeout);
156}
157
158#endif
159
160#ifdef UTIL_QUEUE_FENCE_STANDARD
161void
162util_queue_fence_signal(struct util_queue_fence *fence)
163{
164   mtx_lock(&fence->mutex);
165   fence->signalled = true;
166   cnd_broadcast(&fence->cond);
167   mtx_unlock(&fence->mutex);
168}
169
170void
171_util_queue_fence_wait(struct util_queue_fence *fence)
172{
173   mtx_lock(&fence->mutex);
174   while (!fence->signalled)
175      cnd_wait(&fence->cond, &fence->mutex);
176   mtx_unlock(&fence->mutex);
177}
178
179bool
180_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
181                               int64_t abs_timeout)
182{
183   /* This terrible hack is made necessary by the fact that we really want an
184    * internal interface consistent with os_time_*, but cnd_timedwait is spec'd
185    * to be relative to the TIME_UTC clock.
186    */
187   int64_t rel = abs_timeout - os_time_get_nano();
188
189   if (rel > 0) {
190      struct timespec ts;
191
192      timespec_get(&ts, TIME_UTC);
193
194      ts.tv_sec += abs_timeout / (1000*1000*1000);
195      ts.tv_nsec += abs_timeout % (1000*1000*1000);
196      if (ts.tv_nsec >= (1000*1000*1000)) {
197         ts.tv_sec++;
198         ts.tv_nsec -= (1000*1000*1000);
199      }
200
201      mtx_lock(&fence->mutex);
202      while (!fence->signalled) {
203         if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
204            break;
205      }
206      mtx_unlock(&fence->mutex);
207   }
208
209   return fence->signalled;
210}
211
212void
213util_queue_fence_init(struct util_queue_fence *fence)
214{
215   memset(fence, 0, sizeof(*fence));
216   (void) mtx_init(&fence->mutex, mtx_plain);
217   cnd_init(&fence->cond);
218   fence->signalled = true;
219}
220
221void
222util_queue_fence_destroy(struct util_queue_fence *fence)
223{
224   assert(fence->signalled);
225
226   /* Ensure that another thread is not in the middle of
227    * util_queue_fence_signal (having set the fence to signalled but still
228    * holding the fence mutex).
229    *
230    * A common contract between threads is that as soon as a fence is signalled
231    * by thread A, thread B is allowed to destroy it. Since
232    * util_queue_fence_is_signalled does not lock the fence mutex (for
233    * performance reasons), we must do so here.
234    */
235   mtx_lock(&fence->mutex);
236   mtx_unlock(&fence->mutex);
237
238   cnd_destroy(&fence->cond);
239   mtx_destroy(&fence->mutex);
240}
241#endif
242
243/****************************************************************************
244 * util_queue implementation
245 */
246
247struct thread_input {
248   struct util_queue *queue;
249   int thread_index;
250};
251
252static int
253util_queue_thread_func(void *input)
254{
255   struct util_queue *queue = ((struct thread_input*)input)->queue;
256   int thread_index = ((struct thread_input*)input)->thread_index;
257
258   free(input);
259
260#ifdef HAVE_PTHREAD_SETAFFINITY
261   if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
262      /* Don't inherit the thread affinity from the parent thread.
263       * Set the full mask.
264       */
265#if defined(__NetBSD__)
266      cpuset_t *cpuset;
267      cpuset = cpuset_create();
268      if (cpuset != NULL) {
269         cpuset_zero(cpuset);
270         for (unsigned i = 0; i < cpuset_size(cpuset); i++)
271            cpuset_set(i, cpuset);
272
273         pthread_setaffinity_np(pthread_self(), cpuset_size(cpuset), cpuset);
274         cpuset_destroy(cpuset);
275      }
276#else
277      cpu_set_t cpuset;
278      CPU_ZERO(&cpuset);
279      for (unsigned i = 0; i < CPU_SETSIZE; i++)
280         CPU_SET(i, &cpuset);
281
282      pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
283#endif
284   }
285#endif
286
287   if (strlen(queue->name) > 0) {
288      char name[16];
289      util_snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
290      u_thread_setname(name);
291   }
292
293   while (1) {
294      struct util_queue_job job;
295
296      mtx_lock(&queue->lock);
297      assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
298
299      /* wait if the queue is empty */
300      while (thread_index < queue->num_threads && queue->num_queued == 0)
301         cnd_wait(&queue->has_queued_cond, &queue->lock);
302
303      /* only kill threads that are above "num_threads" */
304      if (thread_index >= queue->num_threads) {
305         mtx_unlock(&queue->lock);
306         break;
307      }
308
309      job = queue->jobs[queue->read_idx];
310      memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
311      queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
312
313      queue->num_queued--;
314      cnd_signal(&queue->has_space_cond);
315      mtx_unlock(&queue->lock);
316
317      if (job.job) {
318         job.execute(job.job, thread_index);
319         util_queue_fence_signal(job.fence);
320         if (job.cleanup)
321            job.cleanup(job.job, thread_index);
322      }
323   }
324
325   /* signal remaining jobs if all threads are being terminated */
326   mtx_lock(&queue->lock);
327   if (queue->num_threads == 0) {
328      for (unsigned i = queue->read_idx; i != queue->write_idx;
329           i = (i + 1) % queue->max_jobs) {
330         if (queue->jobs[i].job) {
331            util_queue_fence_signal(queue->jobs[i].fence);
332            queue->jobs[i].job = NULL;
333         }
334      }
335      queue->read_idx = queue->write_idx;
336      queue->num_queued = 0;
337   }
338   mtx_unlock(&queue->lock);
339   return 0;
340}
341
342static bool
343util_queue_create_thread(struct util_queue *queue, unsigned index)
344{
345   struct thread_input *input =
346      (struct thread_input *) malloc(sizeof(struct thread_input));
347   input->queue = queue;
348   input->thread_index = index;
349
350   queue->threads[index] = u_thread_create(util_queue_thread_func, input);
351
352   if (!queue->threads[index]) {
353      free(input);
354      return false;
355   }
356
357   if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
358#if defined(__linux__) && defined(SCHED_IDLE)
359      struct sched_param sched_param = {0};
360
361      /* The nice() function can only set a maximum of 19.
362       * SCHED_IDLE is the same as nice = 20.
363       *
364       * Note that Linux only allows decreasing the priority. The original
365       * priority can't be restored.
366       */
367      pthread_setschedparam(queue->threads[index], SCHED_IDLE, &sched_param);
368#endif
369   }
370   return true;
371}
372
373void
374util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
375{
376   num_threads = MIN2(num_threads, queue->max_threads);
377   num_threads = MAX2(num_threads, 1);
378
379   mtx_lock(&queue->finish_lock);
380   unsigned old_num_threads = queue->num_threads;
381
382   if (num_threads == old_num_threads) {
383      mtx_unlock(&queue->finish_lock);
384      return;
385   }
386
387   if (num_threads < old_num_threads) {
388      util_queue_kill_threads(queue, num_threads, true);
389      mtx_unlock(&queue->finish_lock);
390      return;
391   }
392
393   /* Create threads.
394    *
395    * We need to update num_threads first, because threads terminate
396    * when thread_index < num_threads.
397    */
398   queue->num_threads = num_threads;
399   for (unsigned i = old_num_threads; i < num_threads; i++) {
400      if (!util_queue_create_thread(queue, i))
401         break;
402   }
403   mtx_unlock(&queue->finish_lock);
404}
405
406bool
407util_queue_init(struct util_queue *queue,
408                const char *name,
409                unsigned max_jobs,
410                unsigned num_threads,
411                unsigned flags)
412{
413   unsigned i;
414
415   /* Form the thread name from process_name and name, limited to 13
416    * characters. Characters 14-15 are reserved for the thread number.
417    * Character 16 should be 0. Final form: "process:name12"
418    *
419    * If name is too long, it's truncated. If any space is left, the process
420    * name fills it.
421    */
422   const char *process_name = util_get_process_name();
423   int process_len = process_name ? strlen(process_name) : 0;
424   int name_len = strlen(name);
425   const int max_chars = sizeof(queue->name) - 1;
426
427   name_len = MIN2(name_len, max_chars);
428
429   /* See if there is any space left for the process name, reserve 1 for
430    * the colon. */
431   process_len = MIN2(process_len, max_chars - name_len - 1);
432   process_len = MAX2(process_len, 0);
433
434   memset(queue, 0, sizeof(*queue));
435
436   if (process_len) {
437      util_snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
438                    process_len, process_name, name);
439   } else {
440      util_snprintf(queue->name, sizeof(queue->name), "%s", name);
441   }
442
443   queue->flags = flags;
444   queue->max_threads = num_threads;
445   queue->num_threads = num_threads;
446   queue->max_jobs = max_jobs;
447
448   queue->jobs = (struct util_queue_job*)
449                 calloc(max_jobs, sizeof(struct util_queue_job));
450   if (!queue->jobs)
451      goto fail;
452
453   (void) mtx_init(&queue->lock, mtx_plain);
454   (void) mtx_init(&queue->finish_lock, mtx_plain);
455
456   queue->num_queued = 0;
457   cnd_init(&queue->has_queued_cond);
458   cnd_init(&queue->has_space_cond);
459
460   queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t));
461   if (!queue->threads)
462      goto fail;
463
464   /* start threads */
465   for (i = 0; i < num_threads; i++) {
466      if (!util_queue_create_thread(queue, i)) {
467         if (i == 0) {
468            /* no threads created, fail */
469            goto fail;
470         } else {
471            /* at least one thread created, so use it */
472            queue->num_threads = i;
473            break;
474         }
475      }
476   }
477
478   add_to_atexit_list(queue);
479   return true;
480
481fail:
482   free(queue->threads);
483
484   if (queue->jobs) {
485      cnd_destroy(&queue->has_space_cond);
486      cnd_destroy(&queue->has_queued_cond);
487      mtx_destroy(&queue->lock);
488      free(queue->jobs);
489   }
490   /* also util_queue_is_initialized can be used to check for success */
491   memset(queue, 0, sizeof(*queue));
492   return false;
493}
494
495static void
496util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
497                        bool finish_locked)
498{
499   unsigned i;
500
501   /* Signal all threads to terminate. */
502   if (!finish_locked)
503      mtx_lock(&queue->finish_lock);
504
505   if (keep_num_threads >= queue->num_threads) {
506      mtx_unlock(&queue->finish_lock);
507      return;
508   }
509
510   mtx_lock(&queue->lock);
511   unsigned old_num_threads = queue->num_threads;
512   /* Setting num_threads is what causes the threads to terminate.
513    * Then cnd_broadcast wakes them up and they will exit their function.
514    */
515   queue->num_threads = keep_num_threads;
516   cnd_broadcast(&queue->has_queued_cond);
517   mtx_unlock(&queue->lock);
518
519   for (i = keep_num_threads; i < old_num_threads; i++)
520      thrd_join(queue->threads[i], NULL);
521
522   if (!finish_locked)
523      mtx_unlock(&queue->finish_lock);
524}
525
526void
527util_queue_destroy(struct util_queue *queue)
528{
529   util_queue_kill_threads(queue, 0, false);
530   remove_from_atexit_list(queue);
531
532   cnd_destroy(&queue->has_space_cond);
533   cnd_destroy(&queue->has_queued_cond);
534   mtx_destroy(&queue->finish_lock);
535   mtx_destroy(&queue->lock);
536   free(queue->jobs);
537   free(queue->threads);
538}
539
540void
541util_queue_add_job(struct util_queue *queue,
542                   void *job,
543                   struct util_queue_fence *fence,
544                   util_queue_execute_func execute,
545                   util_queue_execute_func cleanup)
546{
547   struct util_queue_job *ptr;
548
549   mtx_lock(&queue->lock);
550   if (queue->num_threads == 0) {
551      mtx_unlock(&queue->lock);
552      /* well no good option here, but any leaks will be
553       * short-lived as things are shutting down..
554       */
555      return;
556   }
557
558   util_queue_fence_reset(fence);
559
560   assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
561
562   if (queue->num_queued == queue->max_jobs) {
563      if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) {
564         /* If the queue is full, make it larger to avoid waiting for a free
565          * slot.
566          */
567         unsigned new_max_jobs = queue->max_jobs + 8;
568         struct util_queue_job *jobs =
569            (struct util_queue_job*)calloc(new_max_jobs,
570                                           sizeof(struct util_queue_job));
571         assert(jobs);
572
573         /* Copy all queued jobs into the new list. */
574         unsigned num_jobs = 0;
575         unsigned i = queue->read_idx;
576
577         do {
578            jobs[num_jobs++] = queue->jobs[i];
579            i = (i + 1) % queue->max_jobs;
580         } while (i != queue->write_idx);
581
582         assert(num_jobs == queue->num_queued);
583
584         free(queue->jobs);
585         queue->jobs = jobs;
586         queue->read_idx = 0;
587         queue->write_idx = num_jobs;
588         queue->max_jobs = new_max_jobs;
589      } else {
590         /* Wait until there is a free slot. */
591         while (queue->num_queued == queue->max_jobs)
592            cnd_wait(&queue->has_space_cond, &queue->lock);
593      }
594   }
595
596   ptr = &queue->jobs[queue->write_idx];
597   assert(ptr->job == NULL);
598   ptr->job = job;
599   ptr->fence = fence;
600   ptr->execute = execute;
601   ptr->cleanup = cleanup;
602   queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
603
604   queue->num_queued++;
605   cnd_signal(&queue->has_queued_cond);
606   mtx_unlock(&queue->lock);
607}
608
609/**
610 * Remove a queued job. If the job hasn't started execution, it's removed from
611 * the queue. If the job has started execution, the function waits for it to
612 * complete.
613 *
614 * In all cases, the fence is signalled when the function returns.
615 *
616 * The function can be used when destroying an object associated with the job
617 * when you don't care about the job completion state.
618 */
619void
620util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
621{
622   bool removed = false;
623
624   if (util_queue_fence_is_signalled(fence))
625      return;
626
627   mtx_lock(&queue->lock);
628   for (unsigned i = queue->read_idx; i != queue->write_idx;
629        i = (i + 1) % queue->max_jobs) {
630      if (queue->jobs[i].fence == fence) {
631         if (queue->jobs[i].cleanup)
632            queue->jobs[i].cleanup(queue->jobs[i].job, -1);
633
634         /* Just clear it. The threads will treat as a no-op job. */
635         memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
636         removed = true;
637         break;
638      }
639   }
640   mtx_unlock(&queue->lock);
641
642   if (removed)
643      util_queue_fence_signal(fence);
644   else
645      util_queue_fence_wait(fence);
646}
647
648static void
649util_queue_finish_execute(void *data, int num_thread)
650{
651   util_barrier *barrier = data;
652   util_barrier_wait(barrier);
653}
654
655/**
656 * Wait until all previously added jobs have completed.
657 */
658void
659util_queue_finish(struct util_queue *queue)
660{
661   util_barrier barrier;
662   struct util_queue_fence *fences;
663
664   /* If 2 threads were adding jobs for 2 different barries at the same time,
665    * a deadlock would happen, because 1 barrier requires that all threads
666    * wait for it exclusively.
667    */
668   mtx_lock(&queue->finish_lock);
669   fences = malloc(queue->num_threads * sizeof(*fences));
670   util_barrier_init(&barrier, queue->num_threads);
671
672   for (unsigned i = 0; i < queue->num_threads; ++i) {
673      util_queue_fence_init(&fences[i]);
674      util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL);
675   }
676
677   for (unsigned i = 0; i < queue->num_threads; ++i) {
678      util_queue_fence_wait(&fences[i]);
679      util_queue_fence_destroy(&fences[i]);
680   }
681   mtx_unlock(&queue->finish_lock);
682
683   util_barrier_destroy(&barrier);
684
685   free(fences);
686}
687
688int64_t
689util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
690{
691   /* Allow some flexibility by not raising an error. */
692   if (thread_index >= queue->num_threads)
693      return 0;
694
695   return u_thread_get_time_nano(queue->threads[thread_index]);
696}
697