Lines Matching refs:queue
47 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
98 add_to_atexit_list(struct util_queue *queue)
103 list_add(&queue->head, &queue_list);
108 remove_from_atexit_list(struct util_queue *queue)
114 if (iter == queue) {
262 struct util_queue *queue;
269 struct util_queue *queue = ((struct thread_input*)input)->queue;
274 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
290 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
296 if (strlen(queue->name) > 0) {
298 snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
305 mtx_lock(&queue->lock);
306 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
308 /* wait if the queue is empty */
309 while (thread_index < queue->num_threads && queue->num_queued == 0)
310 cnd_wait(&queue->has_queued_cond, &queue->lock);
313 if (thread_index >= queue->num_threads) {
314 mtx_unlock(&queue->lock);
318 job = queue->jobs[queue->read_idx];
319 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
320 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
322 queue->num_queued--;
323 cnd_signal(&queue->has_space_cond);
325 queue->total_jobs_size -= job.job_size;
326 mtx_unlock(&queue->lock);
338 mtx_lock(&queue->lock);
339 if (queue->num_threads == 0) {
340 for (unsigned i = queue->read_idx; i != queue->write_idx;
341 i = (i + 1) % queue->max_jobs) {
342 if (queue->jobs[i].job) {
343 if (queue->jobs[i].fence)
344 util_queue_fence_signal(queue->jobs[i].fence);
345 queue->jobs[i].job = NULL;
348 queue->read_idx = queue->write_idx;
349 queue->num_queued = 0;
351 mtx_unlock(&queue->lock);
356 util_queue_create_thread(struct util_queue *queue, unsigned index)
360 input->queue = queue;
363 queue->threads[index] = u_thread_create(util_queue_thread_func, input);
365 if (!queue->threads[index]) {
370 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
381 pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param);
388 util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
390 num_threads = MIN2(num_threads, queue->max_threads);
393 simple_mtx_lock(&queue->finish_lock);
394 unsigned old_num_threads = queue->num_threads;
397 simple_mtx_unlock(&queue->finish_lock);
402 util_queue_kill_threads(queue, num_threads, true);
403 simple_mtx_unlock(&queue->finish_lock);
412 queue->num_threads = num_threads;
414 if (!util_queue_create_thread(queue, i))
417 simple_mtx_unlock(&queue->finish_lock);
421 util_queue_init(struct util_queue *queue,
440 const int max_chars = sizeof(queue->name) - 1;
449 memset(queue, 0, sizeof(*queue));
452 snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
455 snprintf(queue->name, sizeof(queue->name), "%s", name);
458 queue->flags = flags;
459 queue->max_threads = num_threads;
460 queue->num_threads = (flags & UTIL_QUEUE_INIT_SCALE_THREADS) ? 1 : num_threads;
461 queue->max_jobs = max_jobs;
462 queue->global_data = global_data;
464 (void) mtx_init(&queue->lock, mtx_plain);
465 (void) simple_mtx_init(&queue->finish_lock, mtx_plain);
467 queue->num_queued = 0;
468 cnd_init(&queue->has_queued_cond);
469 cnd_init(&queue->has_space_cond);
471 queue->jobs = (struct util_queue_job*)
473 if (!queue->jobs)
476 queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t));
477 if (!queue->threads)
481 for (i = 0; i < queue->num_threads; i++) {
482 if (!util_queue_create_thread(queue, i)) {
488 queue->num_threads = i;
494 add_to_atexit_list(queue);
498 free(queue->threads);
500 if (queue->jobs) {
501 cnd_destroy(&queue->has_space_cond);
502 cnd_destroy(&queue->has_queued_cond);
503 mtx_destroy(&queue->lock);
504 free(queue->jobs);
507 memset(queue, 0, sizeof(*queue));
512 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
519 simple_mtx_lock(&queue->finish_lock);
521 if (keep_num_threads >= queue->num_threads) {
522 simple_mtx_unlock(&queue->finish_lock);
526 mtx_lock(&queue->lock);
527 unsigned old_num_threads = queue->num_threads;
531 queue->num_threads = keep_num_threads;
532 cnd_broadcast(&queue->has_queued_cond);
533 mtx_unlock(&queue->lock);
536 thrd_join(queue->threads[i], NULL);
539 simple_mtx_unlock(&queue->finish_lock);
550 util_queue_destroy(struct util_queue *queue)
552 util_queue_kill_threads(queue, 0, false);
554 /* This makes it safe to call on a queue that failedutil_queue_init. */
555 if (queue->head.next != NULL)
556 remove_from_atexit_list(queue);
558 cnd_destroy(&queue->has_space_cond);
559 cnd_destroy(&queue->has_queued_cond);
560 simple_mtx_destroy(&queue->finish_lock);
561 mtx_destroy(&queue->lock);
562 free(queue->jobs);
563 free(queue->threads);
567 util_queue_add_job(struct util_queue *queue,
576 mtx_lock(&queue->lock);
577 if (queue->num_threads == 0) {
578 mtx_unlock(&queue->lock);
588 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
591 if (queue->num_queued == queue->max_jobs) {
592 if ((queue->flags & UTIL_QUEUE_INIT_SCALE_THREADS) &&
594 queue->num_threads < queue->max_threads) {
595 util_queue_adjust_num_threads(queue, queue->num_threads + 1);
598 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
599 queue->total_jobs_size + job_size < S_256MB) {
600 /* If the queue is full, make it larger to avoid waiting for a free
603 unsigned new_max_jobs = queue->max_jobs + 8;
611 unsigned i = queue->read_idx;
614 jobs[num_jobs++] = queue->jobs[i];
615 i = (i + 1) % queue->max_jobs;
616 } while (i != queue->write_idx);
618 assert(num_jobs == queue->num_queued);
620 free(queue->jobs);
621 queue->jobs = jobs;
622 queue->read_idx = 0;
623 queue->write_idx = num_jobs;
624 queue->max_jobs = new_max_jobs;
627 while (queue->num_queued == queue->max_jobs)
628 cnd_wait(&queue->has_space_cond, &queue->lock);
632 ptr = &queue->jobs[queue->write_idx];
635 ptr->global_data = queue->global_data;
641 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
642 queue->total_jobs_size += ptr->job_size;
644 queue->num_queued++;
645 cnd_signal(&queue->has_queued_cond);
646 mtx_unlock(&queue->lock);
651 * the queue. If the job has started execution, the function waits for it to
660 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
667 mtx_lock(&queue->lock);
668 for (unsigned i = queue->read_idx; i != queue->write_idx;
669 i = (i + 1) % queue->max_jobs) {
670 if (queue->jobs[i].fence == fence) {
671 if (queue->jobs[i].cleanup)
672 queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1);
675 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
680 mtx_unlock(&queue->lock);
692 util_queue_finish(struct util_queue *queue)
701 simple_mtx_lock(&queue->finish_lock);
704 if (!queue->num_threads) {
705 simple_mtx_unlock(&queue->finish_lock);
709 fences = malloc(queue->num_threads * sizeof(*fences));
710 util_barrier_init(&barrier, queue->num_threads);
712 for (unsigned i = 0; i < queue->num_threads; ++i) {
714 util_queue_add_job(queue, &barrier, &fences[i],
718 for (unsigned i = 0; i < queue->num_threads; ++i) {
722 simple_mtx_unlock(&queue->finish_lock);
730 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
733 if (thread_index >= queue->num_threads)
736 return util_thread_get_time_nano(queue->threads[thread_index]);