Home | History | Annotate | Line # | Download | only in isc
      1  1.1  christos /*	$NetBSD: loop.c,v 1.3 2025/05/21 14:48:04 christos Exp $	*/
      2  1.1  christos 
      3  1.1  christos /*
      4  1.1  christos  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      5  1.1  christos  *
      6  1.1  christos  * SPDX-License-Identifier: MPL-2.0
      7  1.1  christos  *
      8  1.1  christos  * This Source Code Form is subject to the terms of the Mozilla Public
      9  1.1  christos  * License, v. 2.0. If a copy of the MPL was not distributed with this
     10  1.1  christos  * file, you can obtain one at https://mozilla.org/MPL/2.0/.
     11  1.1  christos  *
     12  1.1  christos  * See the COPYRIGHT file distributed with this work for additional
     13  1.1  christos  * information regarding copyright ownership.
     14  1.1  christos  */
     15  1.1  christos 
     16  1.1  christos #include <stdlib.h>
     17  1.1  christos #include <sys/types.h>
     18  1.1  christos #include <unistd.h>
     19  1.1  christos 
     20  1.1  christos #include <isc/async.h>
     21  1.1  christos #include <isc/atomic.h>
     22  1.1  christos #include <isc/barrier.h>
     23  1.1  christos #include <isc/condition.h>
     24  1.1  christos #include <isc/job.h>
     25  1.1  christos #include <isc/list.h>
     26  1.1  christos #include <isc/log.h>
     27  1.1  christos #include <isc/loop.h>
     28  1.1  christos #include <isc/magic.h>
     29  1.1  christos #include <isc/mem.h>
     30  1.1  christos #include <isc/mutex.h>
     31  1.1  christos #include <isc/refcount.h>
     32  1.1  christos #include <isc/result.h>
     33  1.1  christos #include <isc/signal.h>
     34  1.1  christos #include <isc/strerr.h>
     35  1.1  christos #include <isc/thread.h>
     36  1.1  christos #include <isc/tid.h>
     37  1.1  christos #include <isc/time.h>
     38  1.1  christos #include <isc/urcu.h>
     39  1.1  christos #include <isc/util.h>
     40  1.1  christos #include <isc/uv.h>
     41  1.1  christos #include <isc/work.h>
     42  1.1  christos 
     43  1.1  christos #include "async_p.h"
     44  1.1  christos #include "job_p.h"
     45  1.1  christos #include "loop_p.h"
     46  1.1  christos 
     47  1.1  christos /**
     48  1.1  christos  * Private
     49  1.1  christos  */
     50  1.1  christos 
     51  1.1  christos thread_local isc_loop_t *isc__loop_local = NULL;
     52  1.1  christos 
     53  1.1  christos static void
     54  1.1  christos ignore_signal(int sig, void (*handler)(int)) {
     55  1.1  christos 	struct sigaction sa = { .sa_handler = handler };
     56  1.1  christos 
     57  1.1  christos 	if (sigfillset(&sa.sa_mask) != 0 || sigaction(sig, &sa, NULL) < 0) {
     58  1.1  christos 		FATAL_SYSERROR(errno, "ignore_signal(%d)", sig);
     59  1.1  christos 	}
     60  1.1  christos }
     61  1.1  christos 
     62  1.1  christos void
     63  1.1  christos isc_loopmgr_shutdown(isc_loopmgr_t *loopmgr) {
     64  1.1  christos 	if (!atomic_compare_exchange_strong(&loopmgr->shuttingdown,
     65  1.1  christos 					    &(bool){ false }, true))
     66  1.1  christos 	{
     67  1.1  christos 		return;
     68  1.1  christos 	}
     69  1.1  christos 
     70  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
     71  1.1  christos 		isc_loop_t *loop = &loopmgr->loops[i];
     72  1.1  christos 		int r;
     73  1.1  christos 
     74  1.1  christos 		r = uv_async_send(&loop->shutdown_trigger);
     75  1.1  christos 		UV_RUNTIME_CHECK(uv_async_send, r);
     76  1.1  christos 	}
     77  1.1  christos }
     78  1.1  christos 
     79  1.1  christos static void
     80  1.1  christos isc__loopmgr_signal(void *arg, int signum) {
     81  1.1  christos 	isc_loopmgr_t *loopmgr = (isc_loopmgr_t *)arg;
     82  1.1  christos 
     83  1.1  christos 	switch (signum) {
     84  1.1  christos 	case SIGINT:
     85  1.1  christos 	case SIGTERM:
     86  1.1  christos 		isc_loopmgr_shutdown(loopmgr);
     87  1.1  christos 		break;
     88  1.1  christos 	default:
     89  1.1  christos 		UNREACHABLE();
     90  1.1  christos 	}
     91  1.1  christos }
     92  1.1  christos 
     93  1.1  christos static void
     94  1.1  christos pause_loop(isc_loop_t *loop) {
     95  1.1  christos 	isc_loopmgr_t *loopmgr = loop->loopmgr;
     96  1.1  christos 
     97  1.1  christos 	rcu_thread_offline();
     98  1.1  christos 
     99  1.1  christos 	loop->paused = true;
    100  1.1  christos 	(void)isc_barrier_wait(&loopmgr->pausing);
    101  1.1  christos }
    102  1.1  christos 
    103  1.1  christos static void
    104  1.1  christos resume_loop(isc_loop_t *loop) {
    105  1.1  christos 	isc_loopmgr_t *loopmgr = loop->loopmgr;
    106  1.1  christos 
    107  1.1  christos 	(void)isc_barrier_wait(&loopmgr->resuming);
    108  1.1  christos 	loop->paused = false;
    109  1.1  christos 
    110  1.1  christos 	rcu_thread_online();
    111  1.1  christos }
    112  1.1  christos 
    113  1.1  christos static void
    114  1.1  christos pauseresume_cb(uv_async_t *handle) {
    115  1.1  christos 	isc_loop_t *loop = uv_handle_get_data(handle);
    116  1.1  christos 
    117  1.1  christos 	pause_loop(loop);
    118  1.1  christos 	resume_loop(loop);
    119  1.1  christos }
    120  1.1  christos 
    121  1.1  christos #define XX(uc, lc)                                                         \
    122  1.1  christos 	case UV_##uc:                                                      \
    123  1.1  christos 		fprintf(stderr, "%s, %s: dangling %p: %p.type = %s\n",     \
    124  1.1  christos 			__func__, (char *)arg, handle->loop, handle, #lc); \
    125  1.1  christos 		break;
    126  1.1  christos 
    127  1.1  christos static void
    128  1.1  christos loop_walk_cb(uv_handle_t *handle, void *arg) {
    129  1.1  christos 	if (uv_is_closing(handle)) {
    130  1.1  christos 		return;
    131  1.1  christos 	}
    132  1.1  christos 
    133  1.1  christos 	switch (handle->type) {
    134  1.1  christos 		UV_HANDLE_TYPE_MAP(XX)
    135  1.1  christos 	default:
    136  1.1  christos 		fprintf(stderr, "%s, %s: dangling %p: %p.type = %s\n", __func__,
    137  1.1  christos 			(char *)arg, &handle->loop, handle, "unknown");
    138  1.1  christos 	}
    139  1.1  christos }
    140  1.1  christos 
    141  1.1  christos static void
    142  1.1  christos shutdown_trigger_close_cb(uv_handle_t *handle) {
    143  1.1  christos 	isc_loop_t *loop = uv_handle_get_data(handle);
    144  1.1  christos 
    145  1.1  christos 	isc_loop_detach(&loop);
    146  1.1  christos }
    147  1.1  christos 
    148  1.1  christos static void
    149  1.1  christos destroy_cb(uv_async_t *handle) {
    150  1.1  christos 	isc_loop_t *loop = uv_handle_get_data(handle);
    151  1.1  christos 
    152  1.1  christos 	/* Again, the first close callback here is called last */
    153  1.1  christos 	uv_close(&loop->async_trigger, isc__async_close);
    154  1.1  christos 	uv_close(&loop->run_trigger, isc__job_close);
    155  1.1  christos 	uv_close(&loop->destroy_trigger, NULL);
    156  1.1  christos 	uv_close(&loop->pause_trigger, NULL);
    157  1.1  christos 	uv_close(&loop->quiescent, NULL);
    158  1.1  christos 
    159  1.1  christos 	uv_walk(&loop->loop, loop_walk_cb, (char *)"destroy_cb");
    160  1.1  christos }
    161  1.1  christos 
    162  1.1  christos static void
    163  1.1  christos shutdown_cb(uv_async_t *handle) {
    164  1.1  christos 	isc_loop_t *loop = uv_handle_get_data(handle);
    165  1.1  christos 	isc_loopmgr_t *loopmgr = loop->loopmgr;
    166  1.1  christos 
    167  1.1  christos 	/* Make sure, we can't be called again */
    168  1.1  christos 	uv_close(&loop->shutdown_trigger, shutdown_trigger_close_cb);
    169  1.1  christos 
    170  1.3  christos 	/* Mark this loop as shutting down */
    171  1.3  christos 	loop->shuttingdown = true;
    172  1.3  christos 
    173  1.1  christos 	if (DEFAULT_LOOP(loopmgr) == CURRENT_LOOP(loopmgr)) {
    174  1.1  christos 		/* Stop the signal handlers */
    175  1.1  christos 		isc_signal_stop(loopmgr->sigterm);
    176  1.1  christos 		isc_signal_stop(loopmgr->sigint);
    177  1.1  christos 
    178  1.1  christos 		/* Free the signal handlers */
    179  1.1  christos 		isc_signal_destroy(&loopmgr->sigterm);
    180  1.1  christos 		isc_signal_destroy(&loopmgr->sigint);
    181  1.1  christos 	}
    182  1.1  christos 
    183  1.1  christos 	enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
    184  1.1  christos 		&loop->async_jobs.head, &loop->async_jobs.tail,
    185  1.1  christos 		&loop->teardown_jobs.head, &loop->teardown_jobs.tail);
    186  1.1  christos 	INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
    187  1.1  christos 	int r = uv_async_send(&loop->async_trigger);
    188  1.1  christos 	UV_RUNTIME_CHECK(uv_async_send, r);
    189  1.1  christos }
    190  1.1  christos 
    191  1.1  christos static void
    192  1.1  christos loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid,
    193  1.1  christos 	  const char *kind) {
    194  1.1  christos 	*loop = (isc_loop_t){
    195  1.1  christos 		.tid = tid,
    196  1.1  christos 		.loopmgr = loopmgr,
    197  1.1  christos 		.run_jobs = ISC_LIST_INITIALIZER,
    198  1.1  christos 	};
    199  1.1  christos 
    200  1.1  christos 	__cds_wfcq_init(&loop->async_jobs.head, &loop->async_jobs.tail);
    201  1.1  christos 	__cds_wfcq_init(&loop->setup_jobs.head, &loop->setup_jobs.tail);
    202  1.1  christos 	__cds_wfcq_init(&loop->teardown_jobs.head, &loop->teardown_jobs.tail);
    203  1.1  christos 
    204  1.1  christos 	int r = uv_loop_init(&loop->loop);
    205  1.1  christos 	UV_RUNTIME_CHECK(uv_loop_init, r);
    206  1.1  christos 
    207  1.1  christos 	r = uv_async_init(&loop->loop, &loop->pause_trigger, pauseresume_cb);
    208  1.1  christos 	UV_RUNTIME_CHECK(uv_async_init, r);
    209  1.1  christos 	uv_handle_set_data(&loop->pause_trigger, loop);
    210  1.1  christos 
    211  1.1  christos 	r = uv_async_init(&loop->loop, &loop->shutdown_trigger, shutdown_cb);
    212  1.1  christos 	UV_RUNTIME_CHECK(uv_async_init, r);
    213  1.1  christos 	uv_handle_set_data(&loop->shutdown_trigger, loop);
    214  1.1  christos 
    215  1.1  christos 	r = uv_async_init(&loop->loop, &loop->async_trigger, isc__async_cb);
    216  1.1  christos 	UV_RUNTIME_CHECK(uv_async_init, r);
    217  1.1  christos 	uv_handle_set_data(&loop->async_trigger, loop);
    218  1.1  christos 
    219  1.1  christos 	r = uv_idle_init(&loop->loop, &loop->run_trigger);
    220  1.1  christos 	UV_RUNTIME_CHECK(uv_idle_init, r);
    221  1.1  christos 	uv_handle_set_data(&loop->run_trigger, loop);
    222  1.1  christos 
    223  1.1  christos 	r = uv_async_init(&loop->loop, &loop->destroy_trigger, destroy_cb);
    224  1.1  christos 	UV_RUNTIME_CHECK(uv_async_init, r);
    225  1.1  christos 	uv_handle_set_data(&loop->destroy_trigger, loop);
    226  1.1  christos 
    227  1.1  christos 	r = uv_prepare_init(&loop->loop, &loop->quiescent);
    228  1.1  christos 	UV_RUNTIME_CHECK(uv_prepare_init, r);
    229  1.1  christos 	uv_handle_set_data(&loop->quiescent, loop);
    230  1.1  christos 
    231  1.1  christos 	char name[16];
    232  1.1  christos 	snprintf(name, sizeof(name), "%s-%08" PRIx32, kind, tid);
    233  1.1  christos 	isc_mem_create(&loop->mctx);
    234  1.1  christos 	isc_mem_setname(loop->mctx, name);
    235  1.1  christos 
    236  1.1  christos 	isc_refcount_init(&loop->references, 1);
    237  1.1  christos 
    238  1.1  christos 	loop->magic = LOOP_MAGIC;
    239  1.1  christos }
    240  1.1  christos 
    241  1.1  christos static void
    242  1.1  christos quiescent_cb(uv_prepare_t *handle) {
    243  1.1  christos 	UNUSED(handle);
    244  1.1  christos 
    245  1.1  christos #if defined(RCU_QSBR)
    246  1.1  christos 	/* safe memory reclamation */
    247  1.1  christos 	rcu_quiescent_state();
    248  1.1  christos 
    249  1.1  christos 	/* mark the thread offline when polling */
    250  1.1  christos 	rcu_thread_offline();
    251  1.1  christos #else
    252  1.1  christos 	INSIST(!rcu_read_ongoing());
    253  1.1  christos #endif
    254  1.1  christos }
    255  1.1  christos 
    256  1.1  christos static void
    257  1.1  christos helper_close(isc_loop_t *loop) {
    258  1.1  christos 	int r = uv_loop_close(&loop->loop);
    259  1.1  christos 	UV_RUNTIME_CHECK(uv_loop_close, r);
    260  1.1  christos 
    261  1.1  christos 	INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
    262  1.1  christos 
    263  1.1  christos 	isc_mem_detach(&loop->mctx);
    264  1.1  christos }
    265  1.1  christos 
    266  1.1  christos static void
    267  1.1  christos loop_close(isc_loop_t *loop) {
    268  1.1  christos 	int r = uv_loop_close(&loop->loop);
    269  1.1  christos 	UV_RUNTIME_CHECK(uv_loop_close, r);
    270  1.1  christos 
    271  1.1  christos 	INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
    272  1.1  christos 	INSIST(ISC_LIST_EMPTY(loop->run_jobs));
    273  1.1  christos 
    274  1.1  christos 	loop->magic = 0;
    275  1.1  christos 
    276  1.1  christos 	isc_mem_detach(&loop->mctx);
    277  1.1  christos }
    278  1.1  christos 
    279  1.1  christos static void *
    280  1.1  christos helper_thread(void *arg) {
    281  1.1  christos 	isc_loop_t *helper = (isc_loop_t *)arg;
    282  1.1  christos 
    283  1.1  christos 	int r = uv_prepare_start(&helper->quiescent, quiescent_cb);
    284  1.1  christos 	UV_RUNTIME_CHECK(uv_prepare_start, r);
    285  1.1  christos 
    286  1.1  christos 	isc_barrier_wait(&helper->loopmgr->starting);
    287  1.1  christos 
    288  1.1  christos 	r = uv_run(&helper->loop, UV_RUN_DEFAULT);
    289  1.1  christos 	UV_RUNTIME_CHECK(uv_run, r);
    290  1.1  christos 
    291  1.1  christos 	/* Invalidate the helper early */
    292  1.1  christos 	helper->magic = 0;
    293  1.1  christos 
    294  1.1  christos 	isc_barrier_wait(&helper->loopmgr->stopping);
    295  1.1  christos 
    296  1.1  christos 	return NULL;
    297  1.1  christos }
    298  1.1  christos 
    299  1.1  christos static void *
    300  1.1  christos loop_thread(void *arg) {
    301  1.1  christos 	isc_loop_t *loop = (isc_loop_t *)arg;
    302  1.1  christos 	isc_loopmgr_t *loopmgr = loop->loopmgr;
    303  1.1  christos 	isc_loop_t *helper = &loopmgr->helpers[loop->tid];
    304  1.1  christos 	char name[32];
    305  1.1  christos 	/* Initialize the thread_local variables*/
    306  1.1  christos 
    307  1.1  christos 	REQUIRE(isc__loop_local == NULL || isc__loop_local == loop);
    308  1.1  christos 	isc__loop_local = loop;
    309  1.1  christos 
    310  1.1  christos 	isc__tid_init(loop->tid);
    311  1.1  christos 
    312  1.1  christos 	/* Start the helper thread */
    313  1.1  christos 	isc_thread_create(helper_thread, helper, &helper->thread);
    314  1.1  christos 	snprintf(name, sizeof(name), "isc-helper-%04" PRIu32, loop->tid);
    315  1.1  christos 	isc_thread_setname(helper->thread, name);
    316  1.1  christos 
    317  1.1  christos 	int r = uv_prepare_start(&loop->quiescent, quiescent_cb);
    318  1.1  christos 	UV_RUNTIME_CHECK(uv_prepare_start, r);
    319  1.1  christos 
    320  1.1  christos 	isc_barrier_wait(&loopmgr->starting);
    321  1.1  christos 
    322  1.1  christos 	enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
    323  1.1  christos 		&loop->async_jobs.head, &loop->async_jobs.tail,
    324  1.1  christos 		&loop->setup_jobs.head, &loop->setup_jobs.tail);
    325  1.1  christos 	INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
    326  1.1  christos 
    327  1.1  christos 	r = uv_async_send(&loop->async_trigger);
    328  1.1  christos 	UV_RUNTIME_CHECK(uv_async_send, r);
    329  1.1  christos 
    330  1.1  christos 	r = uv_run(&loop->loop, UV_RUN_DEFAULT);
    331  1.1  christos 	UV_RUNTIME_CHECK(uv_run, r);
    332  1.1  christos 
    333  1.1  christos 	isc__loop_local = NULL;
    334  1.1  christos 
    335  1.1  christos 	/* Invalidate the loop early */
    336  1.1  christos 	loop->magic = 0;
    337  1.1  christos 
    338  1.1  christos 	/* Shutdown the helper thread */
    339  1.1  christos 	r = uv_async_send(&helper->shutdown_trigger);
    340  1.1  christos 	UV_RUNTIME_CHECK(uv_async_send, r);
    341  1.1  christos 
    342  1.1  christos 	isc_barrier_wait(&loopmgr->stopping);
    343  1.1  christos 
    344  1.1  christos 	return NULL;
    345  1.1  christos }
    346  1.1  christos 
    347  1.1  christos /**
    348  1.1  christos  * Public
    349  1.1  christos  */
    350  1.1  christos 
    351  1.1  christos static void
    352  1.1  christos threadpool_initialize(uint32_t workers) {
    353  1.1  christos 	char buf[11];
    354  1.1  christos 	int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf,
    355  1.1  christos 			     &(size_t){ sizeof(buf) });
    356  1.1  christos 	if (r == UV_ENOENT) {
    357  1.1  christos 		snprintf(buf, sizeof(buf), "%" PRIu32, workers);
    358  1.1  christos 		uv_os_setenv("UV_THREADPOOL_SIZE", buf);
    359  1.1  christos 	}
    360  1.1  christos }
    361  1.1  christos 
    362  1.1  christos static void
    363  1.1  christos loop_destroy(isc_loop_t *loop) {
    364  1.1  christos 	int r = uv_async_send(&loop->destroy_trigger);
    365  1.1  christos 	UV_RUNTIME_CHECK(uv_async_send, r);
    366  1.1  christos }
    367  1.1  christos 
    368  1.1  christos #if ISC_LOOP_TRACE
    369  1.1  christos ISC_REFCOUNT_TRACE_IMPL(isc_loop, loop_destroy)
    370  1.1  christos #else
    371  1.1  christos ISC_REFCOUNT_IMPL(isc_loop, loop_destroy);
    372  1.1  christos #endif
    373  1.1  christos 
    374  1.1  christos void
    375  1.1  christos isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) {
    376  1.1  christos 	isc_loopmgr_t *loopmgr = NULL;
    377  1.1  christos 
    378  1.1  christos 	REQUIRE(loopmgrp != NULL && *loopmgrp == NULL);
    379  1.1  christos 	REQUIRE(nloops > 0);
    380  1.1  christos 
    381  1.1  christos 	threadpool_initialize(nloops);
    382  1.1  christos 	isc__tid_initcount(nloops);
    383  1.1  christos 
    384  1.1  christos 	loopmgr = isc_mem_get(mctx, sizeof(*loopmgr));
    385  1.1  christos 	*loopmgr = (isc_loopmgr_t){
    386  1.1  christos 		.nloops = nloops,
    387  1.1  christos 	};
    388  1.1  christos 
    389  1.1  christos 	isc_mem_attach(mctx, &loopmgr->mctx);
    390  1.1  christos 
    391  1.1  christos 	/* We need to double the number for loops and helpers */
    392  1.1  christos 	isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2);
    393  1.1  christos 	isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2);
    394  1.1  christos 	isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2);
    395  1.1  christos 	isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2);
    396  1.1  christos 
    397  1.1  christos 	loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
    398  1.1  christos 				      sizeof(loopmgr->loops[0]));
    399  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
    400  1.1  christos 		isc_loop_t *loop = &loopmgr->loops[i];
    401  1.1  christos 		loop_init(loop, loopmgr, i, "loop");
    402  1.1  christos 	}
    403  1.1  christos 
    404  1.1  christos 	loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
    405  1.1  christos 					sizeof(loopmgr->helpers[0]));
    406  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
    407  1.1  christos 		isc_loop_t *loop = &loopmgr->helpers[i];
    408  1.1  christos 		loop_init(loop, loopmgr, i, "helper");
    409  1.1  christos 	}
    410  1.1  christos 
    411  1.1  christos 	loopmgr->sigint = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr,
    412  1.1  christos 					 SIGINT);
    413  1.1  christos 	loopmgr->sigterm = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr,
    414  1.1  christos 					  SIGTERM);
    415  1.1  christos 
    416  1.1  christos 	isc_signal_start(loopmgr->sigint);
    417  1.1  christos 	isc_signal_start(loopmgr->sigterm);
    418  1.1  christos 
    419  1.1  christos 	loopmgr->magic = LOOPMGR_MAGIC;
    420  1.1  christos 
    421  1.1  christos 	*loopmgrp = loopmgr;
    422  1.1  christos }
    423  1.1  christos 
    424  1.1  christos isc_job_t *
    425  1.1  christos isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
    426  1.1  christos 	REQUIRE(VALID_LOOP(loop));
    427  1.1  christos 	REQUIRE(cb != NULL);
    428  1.1  christos 
    429  1.1  christos 	isc_loopmgr_t *loopmgr = loop->loopmgr;
    430  1.1  christos 	isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
    431  1.1  christos 	*job = (isc_job_t){
    432  1.1  christos 		.cb = cb,
    433  1.1  christos 		.cbarg = cbarg,
    434  1.1  christos 	};
    435  1.1  christos 
    436  1.1  christos 	cds_wfcq_node_init(&job->wfcq_node);
    437  1.1  christos 
    438  1.1  christos 	REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
    439  1.1  christos 		atomic_load(&loopmgr->paused));
    440  1.1  christos 
    441  1.1  christos 	cds_wfcq_enqueue(&loop->setup_jobs.head, &loop->setup_jobs.tail,
    442  1.1  christos 			 &job->wfcq_node);
    443  1.1  christos 
    444  1.1  christos 	return job;
    445  1.1  christos }
    446  1.1  christos 
    447  1.1  christos isc_job_t *
    448  1.1  christos isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
    449  1.1  christos 	REQUIRE(VALID_LOOP(loop));
    450  1.1  christos 
    451  1.1  christos 	isc_loopmgr_t *loopmgr = loop->loopmgr;
    452  1.1  christos 	isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
    453  1.1  christos 	*job = (isc_job_t){
    454  1.1  christos 		.cb = cb,
    455  1.1  christos 		.cbarg = cbarg,
    456  1.1  christos 	};
    457  1.1  christos 	cds_wfcq_node_init(&job->wfcq_node);
    458  1.1  christos 
    459  1.1  christos 	REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
    460  1.1  christos 		atomic_load(&loopmgr->paused));
    461  1.1  christos 
    462  1.1  christos 	cds_wfcq_enqueue(&loop->teardown_jobs.head, &loop->teardown_jobs.tail,
    463  1.1  christos 			 &job->wfcq_node);
    464  1.1  christos 
    465  1.1  christos 	return job;
    466  1.1  christos }
    467  1.1  christos 
    468  1.1  christos void
    469  1.1  christos isc_loopmgr_setup(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg) {
    470  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    471  1.1  christos 	REQUIRE(!atomic_load(&loopmgr->running) ||
    472  1.1  christos 		atomic_load(&loopmgr->paused));
    473  1.1  christos 
    474  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
    475  1.1  christos 		isc_loop_t *loop = &loopmgr->loops[i];
    476  1.1  christos 		(void)isc_loop_setup(loop, cb, cbarg);
    477  1.1  christos 	}
    478  1.1  christos }
    479  1.1  christos 
    480  1.1  christos void
    481  1.1  christos isc_loopmgr_teardown(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg) {
    482  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    483  1.1  christos 	REQUIRE(!atomic_load(&loopmgr->running) ||
    484  1.1  christos 		atomic_load(&loopmgr->paused));
    485  1.1  christos 
    486  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
    487  1.1  christos 		isc_loop_t *loop = &loopmgr->loops[i];
    488  1.1  christos 		(void)isc_loop_teardown(loop, cb, cbarg);
    489  1.1  christos 	}
    490  1.1  christos }
    491  1.1  christos 
    492  1.1  christos void
    493  1.1  christos isc_loopmgr_run(isc_loopmgr_t *loopmgr) {
    494  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    495  1.1  christos 	RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running,
    496  1.1  christos 						     &(bool){ false }, true));
    497  1.1  christos 
    498  1.1  christos 	/*
    499  1.1  christos 	 * Always ignore SIGPIPE.
    500  1.1  christos 	 */
    501  1.1  christos 	ignore_signal(SIGPIPE, SIG_IGN);
    502  1.1  christos 
    503  1.1  christos 	/*
    504  1.1  christos 	 * The thread 0 is this one.
    505  1.1  christos 	 */
    506  1.1  christos 	for (size_t i = 1; i < loopmgr->nloops; i++) {
    507  1.1  christos 		char name[32];
    508  1.1  christos 		isc_loop_t *loop = &loopmgr->loops[i];
    509  1.1  christos 
    510  1.1  christos 		isc_thread_create(loop_thread, loop, &loop->thread);
    511  1.1  christos 
    512  1.1  christos 		snprintf(name, sizeof(name), "isc-loop-%04zu", i);
    513  1.1  christos 		isc_thread_setname(loop->thread, name);
    514  1.1  christos 	}
    515  1.1  christos 
    516  1.1  christos 	isc_thread_main(loop_thread, &loopmgr->loops[0]);
    517  1.1  christos }
    518  1.1  christos 
    519  1.1  christos void
    520  1.1  christos isc_loopmgr_pause(isc_loopmgr_t *loopmgr) {
    521  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    522  1.1  christos 	REQUIRE(isc_tid() != ISC_TID_UNKNOWN);
    523  1.1  christos 
    524  1.1  christos 	if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) {
    525  1.1  christos 		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
    526  1.1  christos 			      ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1),
    527  1.1  christos 			      "loop exclusive mode: starting");
    528  1.1  christos 	}
    529  1.1  christos 
    530  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
    531  1.1  christos 		isc_loop_t *helper = &loopmgr->helpers[i];
    532  1.1  christos 
    533  1.1  christos 		int r = uv_async_send(&helper->pause_trigger);
    534  1.1  christos 		UV_RUNTIME_CHECK(uv_async_send, r);
    535  1.1  christos 	}
    536  1.1  christos 
    537  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
    538  1.1  christos 		isc_loop_t *loop = &loopmgr->loops[i];
    539  1.1  christos 
    540  1.1  christos 		/* Skip current loop */
    541  1.1  christos 		if (i == isc_tid()) {
    542  1.1  christos 			continue;
    543  1.1  christos 		}
    544  1.1  christos 
    545  1.1  christos 		int r = uv_async_send(&loop->pause_trigger);
    546  1.1  christos 		UV_RUNTIME_CHECK(uv_async_send, r);
    547  1.1  christos 	}
    548  1.1  christos 
    549  1.1  christos 	RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->paused,
    550  1.1  christos 						     &(bool){ false }, true));
    551  1.1  christos 	pause_loop(CURRENT_LOOP(loopmgr));
    552  1.1  christos 
    553  1.1  christos 	if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) {
    554  1.1  christos 		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
    555  1.1  christos 			      ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1),
    556  1.1  christos 			      "loop exclusive mode: started");
    557  1.1  christos 	}
    558  1.1  christos }
    559  1.1  christos 
    560  1.1  christos void
    561  1.1  christos isc_loopmgr_resume(isc_loopmgr_t *loopmgr) {
    562  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    563  1.1  christos 
    564  1.1  christos 	if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) {
    565  1.1  christos 		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
    566  1.1  christos 			      ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1),
    567  1.1  christos 			      "loop exclusive mode: ending");
    568  1.1  christos 	}
    569  1.1  christos 
    570  1.1  christos 	RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->paused,
    571  1.1  christos 						     &(bool){ true }, false));
    572  1.1  christos 	resume_loop(CURRENT_LOOP(loopmgr));
    573  1.1  christos 
    574  1.1  christos 	if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) {
    575  1.1  christos 		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
    576  1.1  christos 			      ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1),
    577  1.1  christos 			      "loop exclusive mode: ended");
    578  1.1  christos 	}
    579  1.1  christos }
    580  1.1  christos 
    581  1.1  christos void
    582  1.1  christos isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) {
    583  1.1  christos 	isc_loopmgr_t *loopmgr = NULL;
    584  1.1  christos 
    585  1.1  christos 	REQUIRE(loopmgrp != NULL);
    586  1.1  christos 	REQUIRE(VALID_LOOPMGR(*loopmgrp));
    587  1.1  christos 
    588  1.1  christos 	loopmgr = *loopmgrp;
    589  1.1  christos 	*loopmgrp = NULL;
    590  1.1  christos 
    591  1.1  christos 	RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running,
    592  1.1  christos 						     &(bool){ true }, false));
    593  1.1  christos 
    594  1.1  christos 	/* Wait for all helpers to finish */
    595  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
    596  1.1  christos 		isc_loop_t *helper = &loopmgr->helpers[i];
    597  1.1  christos 		isc_thread_join(helper->thread, NULL);
    598  1.1  christos 	}
    599  1.1  christos 
    600  1.1  christos 	/* First wait for all loops to finish */
    601  1.1  christos 	for (size_t i = 1; i < loopmgr->nloops; i++) {
    602  1.1  christos 		isc_loop_t *loop = &loopmgr->loops[i];
    603  1.1  christos 		isc_thread_join(loop->thread, NULL);
    604  1.1  christos 	}
    605  1.1  christos 
    606  1.1  christos 	loopmgr->magic = 0;
    607  1.1  christos 
    608  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
    609  1.1  christos 		isc_loop_t *helper = &loopmgr->helpers[i];
    610  1.1  christos 		helper_close(helper);
    611  1.1  christos 	}
    612  1.1  christos 	isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops,
    613  1.1  christos 		     sizeof(loopmgr->helpers[0]));
    614  1.1  christos 
    615  1.1  christos 	for (size_t i = 0; i < loopmgr->nloops; i++) {
    616  1.1  christos 		isc_loop_t *loop = &loopmgr->loops[i];
    617  1.1  christos 		loop_close(loop);
    618  1.1  christos 	}
    619  1.1  christos 	isc_mem_cput(loopmgr->mctx, loopmgr->loops, loopmgr->nloops,
    620  1.1  christos 		     sizeof(loopmgr->loops[0]));
    621  1.1  christos 
    622  1.1  christos 	isc_barrier_destroy(&loopmgr->starting);
    623  1.1  christos 	isc_barrier_destroy(&loopmgr->stopping);
    624  1.1  christos 	isc_barrier_destroy(&loopmgr->resuming);
    625  1.1  christos 	isc_barrier_destroy(&loopmgr->pausing);
    626  1.1  christos 
    627  1.1  christos 	isc_mem_putanddetach(&loopmgr->mctx, loopmgr, sizeof(*loopmgr));
    628  1.1  christos }
    629  1.1  christos 
    630  1.1  christos uint32_t
    631  1.1  christos isc_loopmgr_nloops(isc_loopmgr_t *loopmgr) {
    632  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    633  1.1  christos 
    634  1.1  christos 	return loopmgr->nloops;
    635  1.1  christos }
    636  1.1  christos 
    637  1.1  christos isc_mem_t *
    638  1.1  christos isc_loop_getmctx(isc_loop_t *loop) {
    639  1.1  christos 	REQUIRE(VALID_LOOP(loop));
    640  1.1  christos 
    641  1.1  christos 	return loop->mctx;
    642  1.1  christos }
    643  1.1  christos 
    644  1.1  christos isc_loop_t *
    645  1.1  christos isc_loop_main(isc_loopmgr_t *loopmgr) {
    646  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    647  1.1  christos 
    648  1.1  christos 	return DEFAULT_LOOP(loopmgr);
    649  1.1  christos }
    650  1.1  christos 
    651  1.1  christos isc_loop_t *
    652  1.1  christos isc_loop_get(isc_loopmgr_t *loopmgr, uint32_t tid) {
    653  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    654  1.1  christos 	REQUIRE(tid < loopmgr->nloops);
    655  1.1  christos 
    656  1.1  christos 	return LOOP(loopmgr, tid);
    657  1.1  christos }
    658  1.1  christos 
    659  1.1  christos void
    660  1.1  christos isc_loopmgr_blocking(isc_loopmgr_t *loopmgr) {
    661  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    662  1.1  christos 
    663  1.1  christos 	isc_signal_stop(loopmgr->sigterm);
    664  1.1  christos 	isc_signal_stop(loopmgr->sigint);
    665  1.1  christos }
    666  1.1  christos 
    667  1.1  christos void
    668  1.1  christos isc_loopmgr_nonblocking(isc_loopmgr_t *loopmgr) {
    669  1.1  christos 	REQUIRE(VALID_LOOPMGR(loopmgr));
    670  1.1  christos 
    671  1.1  christos 	isc_signal_start(loopmgr->sigint);
    672  1.1  christos 	isc_signal_start(loopmgr->sigterm);
    673  1.1  christos }
    674  1.1  christos 
    675  1.1  christos isc_loopmgr_t *
    676  1.1  christos isc_loop_getloopmgr(isc_loop_t *loop) {
    677  1.1  christos 	REQUIRE(VALID_LOOP(loop));
    678  1.1  christos 
    679  1.1  christos 	return loop->loopmgr;
    680  1.1  christos }
    681  1.1  christos 
    682  1.1  christos isc_time_t
    683  1.1  christos isc_loop_now(isc_loop_t *loop) {
    684  1.1  christos 	REQUIRE(VALID_LOOP(loop));
    685  1.1  christos 
    686  1.1  christos 	uint64_t msec = uv_now(&loop->loop);
    687  1.1  christos 	isc_time_t t = {
    688  1.1  christos 		.seconds = msec / MS_PER_SEC,
    689  1.1  christos 		.nanoseconds = (msec % MS_PER_SEC) * NS_PER_MS,
    690  1.1  christos 	};
    691  1.1  christos 
    692  1.1  christos 	return t;
    693  1.1  christos }
    694  1.1  christos 
    695  1.1  christos bool
    696  1.1  christos isc_loop_shuttingdown(isc_loop_t *loop) {
    697  1.1  christos 	REQUIRE(VALID_LOOP(loop));
    698  1.1  christos 	REQUIRE(loop->tid == isc_tid());
    699  1.1  christos 
    700  1.1  christos 	return loop->shuttingdown;
    701  1.1  christos }
    702