Lines Matching refs:queue
37 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
88 add_to_atexit_list(struct util_queue *queue)
93 LIST_ADD(&queue->head, &queue_list);
98 remove_from_atexit_list(struct util_queue *queue)
104 if (iter == queue) {
248 struct util_queue *queue;
255 struct util_queue *queue = ((struct thread_input*)input)->queue;
261 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
287 if (strlen(queue->name) > 0) {
289 util_snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
296 mtx_lock(&queue->lock);
297 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
299 /* wait if the queue is empty */
300 while (thread_index < queue->num_threads && queue->num_queued == 0)
301 cnd_wait(&queue->has_queued_cond, &queue->lock);
304 if (thread_index >= queue->num_threads) {
305 mtx_unlock(&queue->lock);
309 job = queue->jobs[queue->read_idx];
310 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
311 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
313 queue->num_queued--;
314 cnd_signal(&queue->has_space_cond);
315 mtx_unlock(&queue->lock);
326 mtx_lock(&queue->lock);
327 if (queue->num_threads == 0) {
328 for (unsigned i = queue->read_idx; i != queue->write_idx;
329 i = (i + 1) % queue->max_jobs) {
330 if (queue->jobs[i].job) {
331 util_queue_fence_signal(queue->jobs[i].fence);
332 queue->jobs[i].job = NULL;
335 queue->read_idx = queue->write_idx;
336 queue->num_queued = 0;
338 mtx_unlock(&queue->lock);
343 util_queue_create_thread(struct util_queue *queue, unsigned index)
347 input->queue = queue;
350 queue->threads[index] = u_thread_create(util_queue_thread_func, input);
352 if (!queue->threads[index]) {
357 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
367 pthread_setschedparam(queue->threads[index], SCHED_IDLE, &sched_param);
374 util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
376 num_threads = MIN2(num_threads, queue->max_threads);
379 mtx_lock(&queue->finish_lock);
380 unsigned old_num_threads = queue->num_threads;
383 mtx_unlock(&queue->finish_lock);
388 util_queue_kill_threads(queue, num_threads, true);
389 mtx_unlock(&queue->finish_lock);
398 queue->num_threads = num_threads;
400 if (!util_queue_create_thread(queue, i))
403 mtx_unlock(&queue->finish_lock);
407 util_queue_init(struct util_queue *queue,
425 const int max_chars = sizeof(queue->name) - 1;
434 memset(queue, 0, sizeof(*queue));
437 util_snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
440 util_snprintf(queue->name, sizeof(queue->name), "%s", name);
443 queue->flags = flags;
444 queue->max_threads = num_threads;
445 queue->num_threads = num_threads;
446 queue->max_jobs = max_jobs;
448 queue->jobs = (struct util_queue_job*)
450 if (!queue->jobs)
453 (void) mtx_init(&queue->lock, mtx_plain);
454 (void) mtx_init(&queue->finish_lock, mtx_plain);
456 queue->num_queued = 0;
457 cnd_init(&queue->has_queued_cond);
458 cnd_init(&queue->has_space_cond);
460 queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t));
461 if (!queue->threads)
466 if (!util_queue_create_thread(queue, i)) {
472 queue->num_threads = i;
478 add_to_atexit_list(queue);
482 free(queue->threads);
484 if (queue->jobs) {
485 cnd_destroy(&queue->has_space_cond);
486 cnd_destroy(&queue->has_queued_cond);
487 mtx_destroy(&queue->lock);
488 free(queue->jobs);
491 memset(queue, 0, sizeof(*queue));
496 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
503 mtx_lock(&queue->finish_lock);
505 if (keep_num_threads >= queue->num_threads) {
506 mtx_unlock(&queue->finish_lock);
510 mtx_lock(&queue->lock);
511 unsigned old_num_threads = queue->num_threads;
515 queue->num_threads = keep_num_threads;
516 cnd_broadcast(&queue->has_queued_cond);
517 mtx_unlock(&queue->lock);
520 thrd_join(queue->threads[i], NULL);
523 mtx_unlock(&queue->finish_lock);
527 util_queue_destroy(struct util_queue *queue)
529 util_queue_kill_threads(queue, 0, false);
530 remove_from_atexit_list(queue);
532 cnd_destroy(&queue->has_space_cond);
533 cnd_destroy(&queue->has_queued_cond);
534 mtx_destroy(&queue->finish_lock);
535 mtx_destroy(&queue->lock);
536 free(queue->jobs);
537 free(queue->threads);
541 util_queue_add_job(struct util_queue *queue,
549 mtx_lock(&queue->lock);
550 if (queue->num_threads == 0) {
551 mtx_unlock(&queue->lock);
560 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
562 if (queue->num_queued == queue->max_jobs) {
563 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) {
564 /* If the queue is full, make it larger to avoid waiting for a free
567 unsigned new_max_jobs = queue->max_jobs + 8;
575 unsigned i = queue->read_idx;
578 jobs[num_jobs++] = queue->jobs[i];
579 i = (i + 1) % queue->max_jobs;
580 } while (i != queue->write_idx);
582 assert(num_jobs == queue->num_queued);
584 free(queue->jobs);
585 queue->jobs = jobs;
586 queue->read_idx = 0;
587 queue->write_idx = num_jobs;
588 queue->max_jobs = new_max_jobs;
591 while (queue->num_queued == queue->max_jobs)
592 cnd_wait(&queue->has_space_cond, &queue->lock);
596 ptr = &queue->jobs[queue->write_idx];
602 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
604 queue->num_queued++;
605 cnd_signal(&queue->has_queued_cond);
606 mtx_unlock(&queue->lock);
611 * the queue. If the job has started execution, the function waits for it to
620 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
627 mtx_lock(&queue->lock);
628 for (unsigned i = queue->read_idx; i != queue->write_idx;
629 i = (i + 1) % queue->max_jobs) {
630 if (queue->jobs[i].fence == fence) {
631 if (queue->jobs[i].cleanup)
632 queue->jobs[i].cleanup(queue->jobs[i].job, -1);
635 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
640 mtx_unlock(&queue->lock);
659 util_queue_finish(struct util_queue *queue)
668 mtx_lock(&queue->finish_lock);
669 fences = malloc(queue->num_threads * sizeof(*fences));
670 util_barrier_init(&barrier, queue->num_threads);
672 for (unsigned i = 0; i < queue->num_threads; ++i) {
674 util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL);
677 for (unsigned i = 0; i < queue->num_threads; ++i) {
681 mtx_unlock(&queue->finish_lock);
689 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
692 if (thread_index >= queue->num_threads)
695 return u_thread_get_time_nano(queue->threads[thread_index]);