1b8e80941Smrg/*
2b8e80941Smrg * Copyright © 2016 Advanced Micro Devices, Inc.
3b8e80941Smrg * All Rights Reserved.
4b8e80941Smrg *
5b8e80941Smrg * Permission is hereby granted, free of charge, to any person obtaining
6b8e80941Smrg * a copy of this software and associated documentation files (the
7b8e80941Smrg * "Software"), to deal in the Software without restriction, including
8b8e80941Smrg * without limitation the rights to use, copy, modify, merge, publish,
9b8e80941Smrg * distribute, sub license, and/or sell copies of the Software, and to
10b8e80941Smrg * permit persons to whom the Software is furnished to do so, subject to
11b8e80941Smrg * the following conditions:
12b8e80941Smrg *
13b8e80941Smrg * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14b8e80941Smrg * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15b8e80941Smrg * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16b8e80941Smrg * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17b8e80941Smrg * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18b8e80941Smrg * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19b8e80941Smrg * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20b8e80941Smrg * USE OR OTHER DEALINGS IN THE SOFTWARE.
21b8e80941Smrg *
22b8e80941Smrg * The above copyright notice and this permission notice (including the
23b8e80941Smrg * next paragraph) shall be included in all copies or substantial portions
24b8e80941Smrg * of the Software.
25b8e80941Smrg */
26b8e80941Smrg
27b8e80941Smrg#include "u_queue.h"
28b8e80941Smrg
29b8e80941Smrg#include <time.h>
30b8e80941Smrg
31b8e80941Smrg#include "util/os_time.h"
32b8e80941Smrg#include "util/u_string.h"
33b8e80941Smrg#include "util/u_thread.h"
34b8e80941Smrg#include "u_process.h"
35b8e80941Smrg
36b8e80941Smrgstatic void
37b8e80941Smrgutil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
38b8e80941Smrg                        bool finish_locked);
39b8e80941Smrg
40b8e80941Smrg/****************************************************************************
41b8e80941Smrg * Wait for all queues to assert idle when exit() is called.
42b8e80941Smrg *
43b8e80941Smrg * Otherwise, C++ static variable destructors can be called while threads
44b8e80941Smrg * are using the static variables.
45b8e80941Smrg */
46b8e80941Smrg
47b8e80941Smrgstatic once_flag atexit_once_flag = ONCE_FLAG_INIT;
48b8e80941Smrgstatic struct list_head queue_list;
49b8e80941Smrgstatic mtx_t exit_mutex = _MTX_INITIALIZER_NP;
50b8e80941Smrg
51b8e80941Smrg#define HAVE_NOATEXIT
52b8e80941Smrg#if defined(HAVE_NOATEXIT)
53b8e80941Smrgstatic int global_init_called = 0;
54b8e80941Smrg
55b8e80941Smrgstatic void __attribute__((__destructor__))
56b8e80941Smrg#else
57b8e80941Smrgstatic void
58b8e80941Smrg#endif
59b8e80941Smrgatexit_handler(void)
60b8e80941Smrg{
61b8e80941Smrg   struct util_queue *iter;
62b8e80941Smrg
63b8e80941Smrg#if defined(HAVE_NOATEXIT)
64b8e80941Smrg   if (!global_init_called)
65b8e80941Smrg      return;
66b8e80941Smrg#endif
67b8e80941Smrg
68b8e80941Smrg   mtx_lock(&exit_mutex);
69b8e80941Smrg   /* Wait for all queues to assert idle. */
70b8e80941Smrg   LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
71b8e80941Smrg      util_queue_kill_threads(iter, 0, false);
72b8e80941Smrg   }
73b8e80941Smrg   mtx_unlock(&exit_mutex);
74b8e80941Smrg}
75b8e80941Smrg
76b8e80941Smrgstatic void
77b8e80941Smrgglobal_init(void)
78b8e80941Smrg{
79b8e80941Smrg   LIST_INITHEAD(&queue_list);
80b8e80941Smrg#if defined(HAVE_NOATEXIT)
81b8e80941Smrg   global_init_called = 1;
82b8e80941Smrg#else
83b8e80941Smrg   atexit(atexit_handler);
84b8e80941Smrg#endif
85b8e80941Smrg}
86b8e80941Smrg
87b8e80941Smrgstatic void
88b8e80941Smrgadd_to_atexit_list(struct util_queue *queue)
89b8e80941Smrg{
90b8e80941Smrg   call_once(&atexit_once_flag, global_init);
91b8e80941Smrg
92b8e80941Smrg   mtx_lock(&exit_mutex);
93b8e80941Smrg   LIST_ADD(&queue->head, &queue_list);
94b8e80941Smrg   mtx_unlock(&exit_mutex);
95b8e80941Smrg}
96b8e80941Smrg
97b8e80941Smrgstatic void
98b8e80941Smrgremove_from_atexit_list(struct util_queue *queue)
99b8e80941Smrg{
100b8e80941Smrg   struct util_queue *iter, *tmp;
101b8e80941Smrg
102b8e80941Smrg   mtx_lock(&exit_mutex);
103b8e80941Smrg   LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
104b8e80941Smrg      if (iter == queue) {
105b8e80941Smrg         LIST_DEL(&iter->head);
106b8e80941Smrg         break;
107b8e80941Smrg      }
108b8e80941Smrg   }
109b8e80941Smrg   mtx_unlock(&exit_mutex);
110b8e80941Smrg}
111b8e80941Smrg
112b8e80941Smrg/****************************************************************************
113b8e80941Smrg * util_queue_fence
114b8e80941Smrg */
115b8e80941Smrg
116b8e80941Smrg#ifdef UTIL_QUEUE_FENCE_FUTEX
117b8e80941Smrgstatic bool
118b8e80941Smrgdo_futex_fence_wait(struct util_queue_fence *fence,
119b8e80941Smrg                    bool timeout, int64_t abs_timeout)
120b8e80941Smrg{
121b8e80941Smrg   uint32_t v = fence->val;
122b8e80941Smrg   struct timespec ts;
123b8e80941Smrg   ts.tv_sec = abs_timeout / (1000*1000*1000);
124b8e80941Smrg   ts.tv_nsec = abs_timeout % (1000*1000*1000);
125b8e80941Smrg
126b8e80941Smrg   while (v != 0) {
127b8e80941Smrg      if (v != 2) {
128b8e80941Smrg         v = p_atomic_cmpxchg(&fence->val, 1, 2);
129b8e80941Smrg         if (v == 0)
130b8e80941Smrg            return true;
131b8e80941Smrg      }
132b8e80941Smrg
133b8e80941Smrg      int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
134b8e80941Smrg      if (timeout && r < 0) {
135b8e80941Smrg         if (errno == ETIMEDOUT)
136b8e80941Smrg            return false;
137b8e80941Smrg      }
138b8e80941Smrg
139b8e80941Smrg      v = fence->val;
140b8e80941Smrg   }
141b8e80941Smrg
142b8e80941Smrg   return true;
143b8e80941Smrg}
144b8e80941Smrg
145b8e80941Smrgvoid
146b8e80941Smrg_util_queue_fence_wait(struct util_queue_fence *fence)
147b8e80941Smrg{
148b8e80941Smrg   do_futex_fence_wait(fence, false, 0);
149b8e80941Smrg}
150b8e80941Smrg
151b8e80941Smrgbool
152b8e80941Smrg_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
153b8e80941Smrg                               int64_t abs_timeout)
154b8e80941Smrg{
155b8e80941Smrg   return do_futex_fence_wait(fence, true, abs_timeout);
156b8e80941Smrg}
157b8e80941Smrg
158b8e80941Smrg#endif
159b8e80941Smrg
160b8e80941Smrg#ifdef UTIL_QUEUE_FENCE_STANDARD
161b8e80941Smrgvoid
162b8e80941Smrgutil_queue_fence_signal(struct util_queue_fence *fence)
163b8e80941Smrg{
164b8e80941Smrg   mtx_lock(&fence->mutex);
165b8e80941Smrg   fence->signalled = true;
166b8e80941Smrg   cnd_broadcast(&fence->cond);
167b8e80941Smrg   mtx_unlock(&fence->mutex);
168b8e80941Smrg}
169b8e80941Smrg
170b8e80941Smrgvoid
171b8e80941Smrg_util_queue_fence_wait(struct util_queue_fence *fence)
172b8e80941Smrg{
173b8e80941Smrg   mtx_lock(&fence->mutex);
174b8e80941Smrg   while (!fence->signalled)
175b8e80941Smrg      cnd_wait(&fence->cond, &fence->mutex);
176b8e80941Smrg   mtx_unlock(&fence->mutex);
177b8e80941Smrg}
178b8e80941Smrg
179b8e80941Smrgbool
180b8e80941Smrg_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
181b8e80941Smrg                               int64_t abs_timeout)
182b8e80941Smrg{
183b8e80941Smrg   /* This terrible hack is made necessary by the fact that we really want an
184b8e80941Smrg    * internal interface consistent with os_time_*, but cnd_timedwait is spec'd
185b8e80941Smrg    * to be relative to the TIME_UTC clock.
186b8e80941Smrg    */
187b8e80941Smrg   int64_t rel = abs_timeout - os_time_get_nano();
188b8e80941Smrg
189b8e80941Smrg   if (rel > 0) {
190b8e80941Smrg      struct timespec ts;
191b8e80941Smrg
192b8e80941Smrg      timespec_get(&ts, TIME_UTC);
193b8e80941Smrg
194b8e80941Smrg      ts.tv_sec += abs_timeout / (1000*1000*1000);
195b8e80941Smrg      ts.tv_nsec += abs_timeout % (1000*1000*1000);
196b8e80941Smrg      if (ts.tv_nsec >= (1000*1000*1000)) {
197b8e80941Smrg         ts.tv_sec++;
198b8e80941Smrg         ts.tv_nsec -= (1000*1000*1000);
199b8e80941Smrg      }
200b8e80941Smrg
201b8e80941Smrg      mtx_lock(&fence->mutex);
202b8e80941Smrg      while (!fence->signalled) {
203b8e80941Smrg         if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
204b8e80941Smrg            break;
205b8e80941Smrg      }
206b8e80941Smrg      mtx_unlock(&fence->mutex);
207b8e80941Smrg   }
208b8e80941Smrg
209b8e80941Smrg   return fence->signalled;
210b8e80941Smrg}
211b8e80941Smrg
212b8e80941Smrgvoid
213b8e80941Smrgutil_queue_fence_init(struct util_queue_fence *fence)
214b8e80941Smrg{
215b8e80941Smrg   memset(fence, 0, sizeof(*fence));
216b8e80941Smrg   (void) mtx_init(&fence->mutex, mtx_plain);
217b8e80941Smrg   cnd_init(&fence->cond);
218b8e80941Smrg   fence->signalled = true;
219b8e80941Smrg}
220b8e80941Smrg
221b8e80941Smrgvoid
222b8e80941Smrgutil_queue_fence_destroy(struct util_queue_fence *fence)
223b8e80941Smrg{
224b8e80941Smrg   assert(fence->signalled);
225b8e80941Smrg
226b8e80941Smrg   /* Ensure that another thread is not in the middle of
227b8e80941Smrg    * util_queue_fence_signal (having set the fence to signalled but still
228b8e80941Smrg    * holding the fence mutex).
229b8e80941Smrg    *
230b8e80941Smrg    * A common contract between threads is that as soon as a fence is signalled
231b8e80941Smrg    * by thread A, thread B is allowed to destroy it. Since
232b8e80941Smrg    * util_queue_fence_is_signalled does not lock the fence mutex (for
233b8e80941Smrg    * performance reasons), we must do so here.
234b8e80941Smrg    */
235b8e80941Smrg   mtx_lock(&fence->mutex);
236b8e80941Smrg   mtx_unlock(&fence->mutex);
237b8e80941Smrg
238b8e80941Smrg   cnd_destroy(&fence->cond);
239b8e80941Smrg   mtx_destroy(&fence->mutex);
240b8e80941Smrg}
241b8e80941Smrg#endif
242b8e80941Smrg
243b8e80941Smrg/****************************************************************************
244b8e80941Smrg * util_queue implementation
245b8e80941Smrg */
246b8e80941Smrg
247b8e80941Smrgstruct thread_input {
248b8e80941Smrg   struct util_queue *queue;
249b8e80941Smrg   int thread_index;
250b8e80941Smrg};
251b8e80941Smrg
252b8e80941Smrgstatic int
253b8e80941Smrgutil_queue_thread_func(void *input)
254b8e80941Smrg{
255b8e80941Smrg   struct util_queue *queue = ((struct thread_input*)input)->queue;
256b8e80941Smrg   int thread_index = ((struct thread_input*)input)->thread_index;
257b8e80941Smrg
258b8e80941Smrg   free(input);
259b8e80941Smrg
260b8e80941Smrg#ifdef HAVE_PTHREAD_SETAFFINITY
261b8e80941Smrg   if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
262b8e80941Smrg      /* Don't inherit the thread affinity from the parent thread.
263b8e80941Smrg       * Set the full mask.
264b8e80941Smrg       */
265b8e80941Smrg#if defined(__NetBSD__)
266b8e80941Smrg      cpuset_t *cpuset;
267b8e80941Smrg      cpuset = cpuset_create();
268b8e80941Smrg      if (cpuset != NULL) {
269b8e80941Smrg         cpuset_zero(cpuset);
270b8e80941Smrg         for (unsigned i = 0; i < cpuset_size(cpuset); i++)
271b8e80941Smrg            cpuset_set(i, cpuset);
272b8e80941Smrg
273b8e80941Smrg         pthread_setaffinity_np(pthread_self(), cpuset_size(cpuset), cpuset);
274b8e80941Smrg         cpuset_destroy(cpuset);
275b8e80941Smrg      }
276b8e80941Smrg#else
277b8e80941Smrg      cpu_set_t cpuset;
278b8e80941Smrg      CPU_ZERO(&cpuset);
279b8e80941Smrg      for (unsigned i = 0; i < CPU_SETSIZE; i++)
280b8e80941Smrg         CPU_SET(i, &cpuset);
281b8e80941Smrg
282b8e80941Smrg      pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
283b8e80941Smrg#endif
284b8e80941Smrg   }
285b8e80941Smrg#endif
286b8e80941Smrg
287b8e80941Smrg   if (strlen(queue->name) > 0) {
288b8e80941Smrg      char name[16];
289b8e80941Smrg      util_snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
290b8e80941Smrg      u_thread_setname(name);
291b8e80941Smrg   }
292b8e80941Smrg
293b8e80941Smrg   while (1) {
294b8e80941Smrg      struct util_queue_job job;
295b8e80941Smrg
296b8e80941Smrg      mtx_lock(&queue->lock);
297b8e80941Smrg      assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
298b8e80941Smrg
299b8e80941Smrg      /* wait if the queue is empty */
300b8e80941Smrg      while (thread_index < queue->num_threads && queue->num_queued == 0)
301b8e80941Smrg         cnd_wait(&queue->has_queued_cond, &queue->lock);
302b8e80941Smrg
303b8e80941Smrg      /* only kill threads that are above "num_threads" */
304b8e80941Smrg      if (thread_index >= queue->num_threads) {
305b8e80941Smrg         mtx_unlock(&queue->lock);
306b8e80941Smrg         break;
307b8e80941Smrg      }
308b8e80941Smrg
309b8e80941Smrg      job = queue->jobs[queue->read_idx];
310b8e80941Smrg      memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
311b8e80941Smrg      queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
312b8e80941Smrg
313b8e80941Smrg      queue->num_queued--;
314b8e80941Smrg      cnd_signal(&queue->has_space_cond);
315b8e80941Smrg      mtx_unlock(&queue->lock);
316b8e80941Smrg
317b8e80941Smrg      if (job.job) {
318b8e80941Smrg         job.execute(job.job, thread_index);
319b8e80941Smrg         util_queue_fence_signal(job.fence);
320b8e80941Smrg         if (job.cleanup)
321b8e80941Smrg            job.cleanup(job.job, thread_index);
322b8e80941Smrg      }
323b8e80941Smrg   }
324b8e80941Smrg
325b8e80941Smrg   /* signal remaining jobs if all threads are being terminated */
326b8e80941Smrg   mtx_lock(&queue->lock);
327b8e80941Smrg   if (queue->num_threads == 0) {
328b8e80941Smrg      for (unsigned i = queue->read_idx; i != queue->write_idx;
329b8e80941Smrg           i = (i + 1) % queue->max_jobs) {
330b8e80941Smrg         if (queue->jobs[i].job) {
331b8e80941Smrg            util_queue_fence_signal(queue->jobs[i].fence);
332b8e80941Smrg            queue->jobs[i].job = NULL;
333b8e80941Smrg         }
334b8e80941Smrg      }
335b8e80941Smrg      queue->read_idx = queue->write_idx;
336b8e80941Smrg      queue->num_queued = 0;
337b8e80941Smrg   }
338b8e80941Smrg   mtx_unlock(&queue->lock);
339b8e80941Smrg   return 0;
340b8e80941Smrg}
341b8e80941Smrg
342b8e80941Smrgstatic bool
343b8e80941Smrgutil_queue_create_thread(struct util_queue *queue, unsigned index)
344b8e80941Smrg{
345b8e80941Smrg   struct thread_input *input =
346b8e80941Smrg      (struct thread_input *) malloc(sizeof(struct thread_input));
347b8e80941Smrg   input->queue = queue;
348b8e80941Smrg   input->thread_index = index;
349b8e80941Smrg
350b8e80941Smrg   queue->threads[index] = u_thread_create(util_queue_thread_func, input);
351b8e80941Smrg
352b8e80941Smrg   if (!queue->threads[index]) {
353b8e80941Smrg      free(input);
354b8e80941Smrg      return false;
355b8e80941Smrg   }
356b8e80941Smrg
357b8e80941Smrg   if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
358b8e80941Smrg#if defined(__linux__) && defined(SCHED_IDLE)
359b8e80941Smrg      struct sched_param sched_param = {0};
360b8e80941Smrg
361b8e80941Smrg      /* The nice() function can only set a maximum of 19.
362b8e80941Smrg       * SCHED_IDLE is the same as nice = 20.
363b8e80941Smrg       *
364b8e80941Smrg       * Note that Linux only allows decreasing the priority. The original
365b8e80941Smrg       * priority can't be restored.
366b8e80941Smrg       */
367b8e80941Smrg      pthread_setschedparam(queue->threads[index], SCHED_IDLE, &sched_param);
368b8e80941Smrg#endif
369b8e80941Smrg   }
370b8e80941Smrg   return true;
371b8e80941Smrg}
372b8e80941Smrg
373b8e80941Smrgvoid
374b8e80941Smrgutil_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
375b8e80941Smrg{
376b8e80941Smrg   num_threads = MIN2(num_threads, queue->max_threads);
377b8e80941Smrg   num_threads = MAX2(num_threads, 1);
378b8e80941Smrg
379b8e80941Smrg   mtx_lock(&queue->finish_lock);
380b8e80941Smrg   unsigned old_num_threads = queue->num_threads;
381b8e80941Smrg
382b8e80941Smrg   if (num_threads == old_num_threads) {
383b8e80941Smrg      mtx_unlock(&queue->finish_lock);
384b8e80941Smrg      return;
385b8e80941Smrg   }
386b8e80941Smrg
387b8e80941Smrg   if (num_threads < old_num_threads) {
388b8e80941Smrg      util_queue_kill_threads(queue, num_threads, true);
389b8e80941Smrg      mtx_unlock(&queue->finish_lock);
390b8e80941Smrg      return;
391b8e80941Smrg   }
392b8e80941Smrg
393b8e80941Smrg   /* Create threads.
394b8e80941Smrg    *
395b8e80941Smrg    * We need to update num_threads first, because threads terminate
396b8e80941Smrg    * when thread_index < num_threads.
397b8e80941Smrg    */
398b8e80941Smrg   queue->num_threads = num_threads;
399b8e80941Smrg   for (unsigned i = old_num_threads; i < num_threads; i++) {
400b8e80941Smrg      if (!util_queue_create_thread(queue, i))
401b8e80941Smrg         break;
402b8e80941Smrg   }
403b8e80941Smrg   mtx_unlock(&queue->finish_lock);
404b8e80941Smrg}
405b8e80941Smrg
406b8e80941Smrgbool
407b8e80941Smrgutil_queue_init(struct util_queue *queue,
408b8e80941Smrg                const char *name,
409b8e80941Smrg                unsigned max_jobs,
410b8e80941Smrg                unsigned num_threads,
411b8e80941Smrg                unsigned flags)
412b8e80941Smrg{
413b8e80941Smrg   unsigned i;
414b8e80941Smrg
415b8e80941Smrg   /* Form the thread name from process_name and name, limited to 13
416b8e80941Smrg    * characters. Characters 14-15 are reserved for the thread number.
417b8e80941Smrg    * Character 16 should be 0. Final form: "process:name12"
418b8e80941Smrg    *
419b8e80941Smrg    * If name is too long, it's truncated. If any space is left, the process
420b8e80941Smrg    * name fills it.
421b8e80941Smrg    */
422b8e80941Smrg   const char *process_name = util_get_process_name();
423b8e80941Smrg   int process_len = process_name ? strlen(process_name) : 0;
424b8e80941Smrg   int name_len = strlen(name);
425b8e80941Smrg   const int max_chars = sizeof(queue->name) - 1;
426b8e80941Smrg
427b8e80941Smrg   name_len = MIN2(name_len, max_chars);
428b8e80941Smrg
429b8e80941Smrg   /* See if there is any space left for the process name, reserve 1 for
430b8e80941Smrg    * the colon. */
431b8e80941Smrg   process_len = MIN2(process_len, max_chars - name_len - 1);
432b8e80941Smrg   process_len = MAX2(process_len, 0);
433b8e80941Smrg
434b8e80941Smrg   memset(queue, 0, sizeof(*queue));
435b8e80941Smrg
436b8e80941Smrg   if (process_len) {
437b8e80941Smrg      util_snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
438b8e80941Smrg                    process_len, process_name, name);
439b8e80941Smrg   } else {
440b8e80941Smrg      util_snprintf(queue->name, sizeof(queue->name), "%s", name);
441b8e80941Smrg   }
442b8e80941Smrg
443b8e80941Smrg   queue->flags = flags;
444b8e80941Smrg   queue->max_threads = num_threads;
445b8e80941Smrg   queue->num_threads = num_threads;
446b8e80941Smrg   queue->max_jobs = max_jobs;
447b8e80941Smrg
448b8e80941Smrg   queue->jobs = (struct util_queue_job*)
449b8e80941Smrg                 calloc(max_jobs, sizeof(struct util_queue_job));
450b8e80941Smrg   if (!queue->jobs)
451b8e80941Smrg      goto fail;
452b8e80941Smrg
453b8e80941Smrg   (void) mtx_init(&queue->lock, mtx_plain);
454b8e80941Smrg   (void) mtx_init(&queue->finish_lock, mtx_plain);
455b8e80941Smrg
456b8e80941Smrg   queue->num_queued = 0;
457b8e80941Smrg   cnd_init(&queue->has_queued_cond);
458b8e80941Smrg   cnd_init(&queue->has_space_cond);
459b8e80941Smrg
460b8e80941Smrg   queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t));
461b8e80941Smrg   if (!queue->threads)
462b8e80941Smrg      goto fail;
463b8e80941Smrg
464b8e80941Smrg   /* start threads */
465b8e80941Smrg   for (i = 0; i < num_threads; i++) {
466b8e80941Smrg      if (!util_queue_create_thread(queue, i)) {
467b8e80941Smrg         if (i == 0) {
468b8e80941Smrg            /* no threads created, fail */
469b8e80941Smrg            goto fail;
470b8e80941Smrg         } else {
471b8e80941Smrg            /* at least one thread created, so use it */
472b8e80941Smrg            queue->num_threads = i;
473b8e80941Smrg            break;
474b8e80941Smrg         }
475b8e80941Smrg      }
476b8e80941Smrg   }
477b8e80941Smrg
478b8e80941Smrg   add_to_atexit_list(queue);
479b8e80941Smrg   return true;
480b8e80941Smrg
481b8e80941Smrgfail:
482b8e80941Smrg   free(queue->threads);
483b8e80941Smrg
484b8e80941Smrg   if (queue->jobs) {
485b8e80941Smrg      cnd_destroy(&queue->has_space_cond);
486b8e80941Smrg      cnd_destroy(&queue->has_queued_cond);
487b8e80941Smrg      mtx_destroy(&queue->lock);
488b8e80941Smrg      free(queue->jobs);
489b8e80941Smrg   }
490b8e80941Smrg   /* also util_queue_is_initialized can be used to check for success */
491b8e80941Smrg   memset(queue, 0, sizeof(*queue));
492b8e80941Smrg   return false;
493b8e80941Smrg}
494b8e80941Smrg
495b8e80941Smrgstatic void
496b8e80941Smrgutil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
497b8e80941Smrg                        bool finish_locked)
498b8e80941Smrg{
499b8e80941Smrg   unsigned i;
500b8e80941Smrg
501b8e80941Smrg   /* Signal all threads to terminate. */
502b8e80941Smrg   if (!finish_locked)
503b8e80941Smrg      mtx_lock(&queue->finish_lock);
504b8e80941Smrg
505b8e80941Smrg   if (keep_num_threads >= queue->num_threads) {
506b8e80941Smrg      mtx_unlock(&queue->finish_lock);
507b8e80941Smrg      return;
508b8e80941Smrg   }
509b8e80941Smrg
510b8e80941Smrg   mtx_lock(&queue->lock);
511b8e80941Smrg   unsigned old_num_threads = queue->num_threads;
512b8e80941Smrg   /* Setting num_threads is what causes the threads to terminate.
513b8e80941Smrg    * Then cnd_broadcast wakes them up and they will exit their function.
514b8e80941Smrg    */
515b8e80941Smrg   queue->num_threads = keep_num_threads;
516b8e80941Smrg   cnd_broadcast(&queue->has_queued_cond);
517b8e80941Smrg   mtx_unlock(&queue->lock);
518b8e80941Smrg
519b8e80941Smrg   for (i = keep_num_threads; i < old_num_threads; i++)
520b8e80941Smrg      thrd_join(queue->threads[i], NULL);
521b8e80941Smrg
522b8e80941Smrg   if (!finish_locked)
523b8e80941Smrg      mtx_unlock(&queue->finish_lock);
524b8e80941Smrg}
525b8e80941Smrg
526b8e80941Smrgvoid
527b8e80941Smrgutil_queue_destroy(struct util_queue *queue)
528b8e80941Smrg{
529b8e80941Smrg   util_queue_kill_threads(queue, 0, false);
530b8e80941Smrg   remove_from_atexit_list(queue);
531b8e80941Smrg
532b8e80941Smrg   cnd_destroy(&queue->has_space_cond);
533b8e80941Smrg   cnd_destroy(&queue->has_queued_cond);
534b8e80941Smrg   mtx_destroy(&queue->finish_lock);
535b8e80941Smrg   mtx_destroy(&queue->lock);
536b8e80941Smrg   free(queue->jobs);
537b8e80941Smrg   free(queue->threads);
538b8e80941Smrg}
539b8e80941Smrg
540b8e80941Smrgvoid
541b8e80941Smrgutil_queue_add_job(struct util_queue *queue,
542b8e80941Smrg                   void *job,
543b8e80941Smrg                   struct util_queue_fence *fence,
544b8e80941Smrg                   util_queue_execute_func execute,
545b8e80941Smrg                   util_queue_execute_func cleanup)
546b8e80941Smrg{
547b8e80941Smrg   struct util_queue_job *ptr;
548b8e80941Smrg
549b8e80941Smrg   mtx_lock(&queue->lock);
550b8e80941Smrg   if (queue->num_threads == 0) {
551b8e80941Smrg      mtx_unlock(&queue->lock);
552b8e80941Smrg      /* well no good option here, but any leaks will be
553b8e80941Smrg       * short-lived as things are shutting down..
554b8e80941Smrg       */
555b8e80941Smrg      return;
556b8e80941Smrg   }
557b8e80941Smrg
558b8e80941Smrg   util_queue_fence_reset(fence);
559b8e80941Smrg
560b8e80941Smrg   assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
561b8e80941Smrg
562b8e80941Smrg   if (queue->num_queued == queue->max_jobs) {
563b8e80941Smrg      if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) {
564b8e80941Smrg         /* If the queue is full, make it larger to avoid waiting for a free
565b8e80941Smrg          * slot.
566b8e80941Smrg          */
567b8e80941Smrg         unsigned new_max_jobs = queue->max_jobs + 8;
568b8e80941Smrg         struct util_queue_job *jobs =
569b8e80941Smrg            (struct util_queue_job*)calloc(new_max_jobs,
570b8e80941Smrg                                           sizeof(struct util_queue_job));
571b8e80941Smrg         assert(jobs);
572b8e80941Smrg
573b8e80941Smrg         /* Copy all queued jobs into the new list. */
574b8e80941Smrg         unsigned num_jobs = 0;
575b8e80941Smrg         unsigned i = queue->read_idx;
576b8e80941Smrg
577b8e80941Smrg         do {
578b8e80941Smrg            jobs[num_jobs++] = queue->jobs[i];
579b8e80941Smrg            i = (i + 1) % queue->max_jobs;
580b8e80941Smrg         } while (i != queue->write_idx);
581b8e80941Smrg
582b8e80941Smrg         assert(num_jobs == queue->num_queued);
583b8e80941Smrg
584b8e80941Smrg         free(queue->jobs);
585b8e80941Smrg         queue->jobs = jobs;
586b8e80941Smrg         queue->read_idx = 0;
587b8e80941Smrg         queue->write_idx = num_jobs;
588b8e80941Smrg         queue->max_jobs = new_max_jobs;
589b8e80941Smrg      } else {
590b8e80941Smrg         /* Wait until there is a free slot. */
591b8e80941Smrg         while (queue->num_queued == queue->max_jobs)
592b8e80941Smrg            cnd_wait(&queue->has_space_cond, &queue->lock);
593b8e80941Smrg      }
594b8e80941Smrg   }
595b8e80941Smrg
596b8e80941Smrg   ptr = &queue->jobs[queue->write_idx];
597b8e80941Smrg   assert(ptr->job == NULL);
598b8e80941Smrg   ptr->job = job;
599b8e80941Smrg   ptr->fence = fence;
600b8e80941Smrg   ptr->execute = execute;
601b8e80941Smrg   ptr->cleanup = cleanup;
602b8e80941Smrg   queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
603b8e80941Smrg
604b8e80941Smrg   queue->num_queued++;
605b8e80941Smrg   cnd_signal(&queue->has_queued_cond);
606b8e80941Smrg   mtx_unlock(&queue->lock);
607b8e80941Smrg}
608b8e80941Smrg
609b8e80941Smrg/**
610b8e80941Smrg * Remove a queued job. If the job hasn't started execution, it's removed from
611b8e80941Smrg * the queue. If the job has started execution, the function waits for it to
612b8e80941Smrg * complete.
613b8e80941Smrg *
614b8e80941Smrg * In all cases, the fence is signalled when the function returns.
615b8e80941Smrg *
616b8e80941Smrg * The function can be used when destroying an object associated with the job
617b8e80941Smrg * when you don't care about the job completion state.
618b8e80941Smrg */
619b8e80941Smrgvoid
620b8e80941Smrgutil_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
621b8e80941Smrg{
622b8e80941Smrg   bool removed = false;
623b8e80941Smrg
624b8e80941Smrg   if (util_queue_fence_is_signalled(fence))
625b8e80941Smrg      return;
626b8e80941Smrg
627b8e80941Smrg   mtx_lock(&queue->lock);
628b8e80941Smrg   for (unsigned i = queue->read_idx; i != queue->write_idx;
629b8e80941Smrg        i = (i + 1) % queue->max_jobs) {
630b8e80941Smrg      if (queue->jobs[i].fence == fence) {
631b8e80941Smrg         if (queue->jobs[i].cleanup)
632b8e80941Smrg            queue->jobs[i].cleanup(queue->jobs[i].job, -1);
633b8e80941Smrg
634b8e80941Smrg         /* Just clear it. The threads will treat as a no-op job. */
635b8e80941Smrg         memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
636b8e80941Smrg         removed = true;
637b8e80941Smrg         break;
638b8e80941Smrg      }
639b8e80941Smrg   }
640b8e80941Smrg   mtx_unlock(&queue->lock);
641b8e80941Smrg
642b8e80941Smrg   if (removed)
643b8e80941Smrg      util_queue_fence_signal(fence);
644b8e80941Smrg   else
645b8e80941Smrg      util_queue_fence_wait(fence);
646b8e80941Smrg}
647b8e80941Smrg
648b8e80941Smrgstatic void
649b8e80941Smrgutil_queue_finish_execute(void *data, int num_thread)
650b8e80941Smrg{
651b8e80941Smrg   util_barrier *barrier = data;
652b8e80941Smrg   util_barrier_wait(barrier);
653b8e80941Smrg}
654b8e80941Smrg
655b8e80941Smrg/**
656b8e80941Smrg * Wait until all previously added jobs have completed.
657b8e80941Smrg */
658b8e80941Smrgvoid
659b8e80941Smrgutil_queue_finish(struct util_queue *queue)
660b8e80941Smrg{
661b8e80941Smrg   util_barrier barrier;
662b8e80941Smrg   struct util_queue_fence *fences;
663b8e80941Smrg
664b8e80941Smrg   /* If 2 threads were adding jobs for 2 different barries at the same time,
665b8e80941Smrg    * a deadlock would happen, because 1 barrier requires that all threads
666b8e80941Smrg    * wait for it exclusively.
667b8e80941Smrg    */
668b8e80941Smrg   mtx_lock(&queue->finish_lock);
669b8e80941Smrg   fences = malloc(queue->num_threads * sizeof(*fences));
670b8e80941Smrg   util_barrier_init(&barrier, queue->num_threads);
671b8e80941Smrg
672b8e80941Smrg   for (unsigned i = 0; i < queue->num_threads; ++i) {
673b8e80941Smrg      util_queue_fence_init(&fences[i]);
674b8e80941Smrg      util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL);
675b8e80941Smrg   }
676b8e80941Smrg
677b8e80941Smrg   for (unsigned i = 0; i < queue->num_threads; ++i) {
678b8e80941Smrg      util_queue_fence_wait(&fences[i]);
679b8e80941Smrg      util_queue_fence_destroy(&fences[i]);
680b8e80941Smrg   }
681b8e80941Smrg   mtx_unlock(&queue->finish_lock);
682b8e80941Smrg
683b8e80941Smrg   util_barrier_destroy(&barrier);
684b8e80941Smrg
685b8e80941Smrg   free(fences);
686b8e80941Smrg}
687b8e80941Smrg
688b8e80941Smrgint64_t
689b8e80941Smrgutil_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
690b8e80941Smrg{
691b8e80941Smrg   /* Allow some flexibility by not raising an error. */
692b8e80941Smrg   if (thread_index >= queue->num_threads)
693b8e80941Smrg      return 0;
694b8e80941Smrg
695b8e80941Smrg   return u_thread_get_time_nano(queue->threads[thread_index]);
696b8e80941Smrg}
697