1 1.1 christos /* $NetBSD: loop.c,v 1.3 2025/05/21 14:48:04 christos Exp $ */ 2 1.1 christos 3 1.1 christos /* 4 1.1 christos * Copyright (C) Internet Systems Consortium, Inc. ("ISC") 5 1.1 christos * 6 1.1 christos * SPDX-License-Identifier: MPL-2.0 7 1.1 christos * 8 1.1 christos * This Source Code Form is subject to the terms of the Mozilla Public 9 1.1 christos * License, v. 2.0. If a copy of the MPL was not distributed with this 10 1.1 christos * file, you can obtain one at https://mozilla.org/MPL/2.0/. 11 1.1 christos * 12 1.1 christos * See the COPYRIGHT file distributed with this work for additional 13 1.1 christos * information regarding copyright ownership. 14 1.1 christos */ 15 1.1 christos 16 1.1 christos #include <stdlib.h> 17 1.1 christos #include <sys/types.h> 18 1.1 christos #include <unistd.h> 19 1.1 christos 20 1.1 christos #include <isc/async.h> 21 1.1 christos #include <isc/atomic.h> 22 1.1 christos #include <isc/barrier.h> 23 1.1 christos #include <isc/condition.h> 24 1.1 christos #include <isc/job.h> 25 1.1 christos #include <isc/list.h> 26 1.1 christos #include <isc/log.h> 27 1.1 christos #include <isc/loop.h> 28 1.1 christos #include <isc/magic.h> 29 1.1 christos #include <isc/mem.h> 30 1.1 christos #include <isc/mutex.h> 31 1.1 christos #include <isc/refcount.h> 32 1.1 christos #include <isc/result.h> 33 1.1 christos #include <isc/signal.h> 34 1.1 christos #include <isc/strerr.h> 35 1.1 christos #include <isc/thread.h> 36 1.1 christos #include <isc/tid.h> 37 1.1 christos #include <isc/time.h> 38 1.1 christos #include <isc/urcu.h> 39 1.1 christos #include <isc/util.h> 40 1.1 christos #include <isc/uv.h> 41 1.1 christos #include <isc/work.h> 42 1.1 christos 43 1.1 christos #include "async_p.h" 44 1.1 christos #include "job_p.h" 45 1.1 christos #include "loop_p.h" 46 1.1 christos 47 1.1 christos /** 48 1.1 christos * Private 49 1.1 christos */ 50 1.1 christos 51 1.1 christos thread_local isc_loop_t *isc__loop_local = NULL; 52 1.1 christos 53 1.1 christos static void 54 1.1 christos ignore_signal(int sig, void (*handler)(int)) { 55 1.1 christos struct sigaction sa = { .sa_handler = handler }; 56 1.1 christos 57 1.1 christos if (sigfillset(&sa.sa_mask) != 0 || sigaction(sig, &sa, NULL) < 0) { 58 1.1 christos FATAL_SYSERROR(errno, "ignore_signal(%d)", sig); 59 1.1 christos } 60 1.1 christos } 61 1.1 christos 62 1.1 christos void 63 1.1 christos isc_loopmgr_shutdown(isc_loopmgr_t *loopmgr) { 64 1.1 christos if (!atomic_compare_exchange_strong(&loopmgr->shuttingdown, 65 1.1 christos &(bool){ false }, true)) 66 1.1 christos { 67 1.1 christos return; 68 1.1 christos } 69 1.1 christos 70 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 71 1.1 christos isc_loop_t *loop = &loopmgr->loops[i]; 72 1.1 christos int r; 73 1.1 christos 74 1.1 christos r = uv_async_send(&loop->shutdown_trigger); 75 1.1 christos UV_RUNTIME_CHECK(uv_async_send, r); 76 1.1 christos } 77 1.1 christos } 78 1.1 christos 79 1.1 christos static void 80 1.1 christos isc__loopmgr_signal(void *arg, int signum) { 81 1.1 christos isc_loopmgr_t *loopmgr = (isc_loopmgr_t *)arg; 82 1.1 christos 83 1.1 christos switch (signum) { 84 1.1 christos case SIGINT: 85 1.1 christos case SIGTERM: 86 1.1 christos isc_loopmgr_shutdown(loopmgr); 87 1.1 christos break; 88 1.1 christos default: 89 1.1 christos UNREACHABLE(); 90 1.1 christos } 91 1.1 christos } 92 1.1 christos 93 1.1 christos static void 94 1.1 christos pause_loop(isc_loop_t *loop) { 95 1.1 christos isc_loopmgr_t *loopmgr = loop->loopmgr; 96 1.1 christos 97 1.1 christos rcu_thread_offline(); 98 1.1 christos 99 1.1 christos loop->paused = true; 100 1.1 christos (void)isc_barrier_wait(&loopmgr->pausing); 101 1.1 christos } 102 1.1 christos 103 1.1 christos static void 104 1.1 christos resume_loop(isc_loop_t *loop) { 105 1.1 christos isc_loopmgr_t *loopmgr = loop->loopmgr; 106 1.1 christos 107 1.1 christos (void)isc_barrier_wait(&loopmgr->resuming); 108 1.1 christos loop->paused = false; 109 1.1 christos 110 1.1 christos rcu_thread_online(); 111 1.1 christos } 112 1.1 christos 113 1.1 christos static void 114 1.1 christos pauseresume_cb(uv_async_t *handle) { 115 1.1 christos isc_loop_t *loop = uv_handle_get_data(handle); 116 1.1 christos 117 1.1 christos pause_loop(loop); 118 1.1 christos resume_loop(loop); 119 1.1 christos } 120 1.1 christos 121 1.1 christos #define XX(uc, lc) \ 122 1.1 christos case UV_##uc: \ 123 1.1 christos fprintf(stderr, "%s, %s: dangling %p: %p.type = %s\n", \ 124 1.1 christos __func__, (char *)arg, handle->loop, handle, #lc); \ 125 1.1 christos break; 126 1.1 christos 127 1.1 christos static void 128 1.1 christos loop_walk_cb(uv_handle_t *handle, void *arg) { 129 1.1 christos if (uv_is_closing(handle)) { 130 1.1 christos return; 131 1.1 christos } 132 1.1 christos 133 1.1 christos switch (handle->type) { 134 1.1 christos UV_HANDLE_TYPE_MAP(XX) 135 1.1 christos default: 136 1.1 christos fprintf(stderr, "%s, %s: dangling %p: %p.type = %s\n", __func__, 137 1.1 christos (char *)arg, &handle->loop, handle, "unknown"); 138 1.1 christos } 139 1.1 christos } 140 1.1 christos 141 1.1 christos static void 142 1.1 christos shutdown_trigger_close_cb(uv_handle_t *handle) { 143 1.1 christos isc_loop_t *loop = uv_handle_get_data(handle); 144 1.1 christos 145 1.1 christos isc_loop_detach(&loop); 146 1.1 christos } 147 1.1 christos 148 1.1 christos static void 149 1.1 christos destroy_cb(uv_async_t *handle) { 150 1.1 christos isc_loop_t *loop = uv_handle_get_data(handle); 151 1.1 christos 152 1.1 christos /* Again, the first close callback here is called last */ 153 1.1 christos uv_close(&loop->async_trigger, isc__async_close); 154 1.1 christos uv_close(&loop->run_trigger, isc__job_close); 155 1.1 christos uv_close(&loop->destroy_trigger, NULL); 156 1.1 christos uv_close(&loop->pause_trigger, NULL); 157 1.1 christos uv_close(&loop->quiescent, NULL); 158 1.1 christos 159 1.1 christos uv_walk(&loop->loop, loop_walk_cb, (char *)"destroy_cb"); 160 1.1 christos } 161 1.1 christos 162 1.1 christos static void 163 1.1 christos shutdown_cb(uv_async_t *handle) { 164 1.1 christos isc_loop_t *loop = uv_handle_get_data(handle); 165 1.1 christos isc_loopmgr_t *loopmgr = loop->loopmgr; 166 1.1 christos 167 1.1 christos /* Make sure, we can't be called again */ 168 1.1 christos uv_close(&loop->shutdown_trigger, shutdown_trigger_close_cb); 169 1.1 christos 170 1.3 christos /* Mark this loop as shutting down */ 171 1.3 christos loop->shuttingdown = true; 172 1.3 christos 173 1.1 christos if (DEFAULT_LOOP(loopmgr) == CURRENT_LOOP(loopmgr)) { 174 1.1 christos /* Stop the signal handlers */ 175 1.1 christos isc_signal_stop(loopmgr->sigterm); 176 1.1 christos isc_signal_stop(loopmgr->sigint); 177 1.1 christos 178 1.1 christos /* Free the signal handlers */ 179 1.1 christos isc_signal_destroy(&loopmgr->sigterm); 180 1.1 christos isc_signal_destroy(&loopmgr->sigint); 181 1.1 christos } 182 1.1 christos 183 1.1 christos enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( 184 1.1 christos &loop->async_jobs.head, &loop->async_jobs.tail, 185 1.1 christos &loop->teardown_jobs.head, &loop->teardown_jobs.tail); 186 1.1 christos INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); 187 1.1 christos int r = uv_async_send(&loop->async_trigger); 188 1.1 christos UV_RUNTIME_CHECK(uv_async_send, r); 189 1.1 christos } 190 1.1 christos 191 1.1 christos static void 192 1.1 christos loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid, 193 1.1 christos const char *kind) { 194 1.1 christos *loop = (isc_loop_t){ 195 1.1 christos .tid = tid, 196 1.1 christos .loopmgr = loopmgr, 197 1.1 christos .run_jobs = ISC_LIST_INITIALIZER, 198 1.1 christos }; 199 1.1 christos 200 1.1 christos __cds_wfcq_init(&loop->async_jobs.head, &loop->async_jobs.tail); 201 1.1 christos __cds_wfcq_init(&loop->setup_jobs.head, &loop->setup_jobs.tail); 202 1.1 christos __cds_wfcq_init(&loop->teardown_jobs.head, &loop->teardown_jobs.tail); 203 1.1 christos 204 1.1 christos int r = uv_loop_init(&loop->loop); 205 1.1 christos UV_RUNTIME_CHECK(uv_loop_init, r); 206 1.1 christos 207 1.1 christos r = uv_async_init(&loop->loop, &loop->pause_trigger, pauseresume_cb); 208 1.1 christos UV_RUNTIME_CHECK(uv_async_init, r); 209 1.1 christos uv_handle_set_data(&loop->pause_trigger, loop); 210 1.1 christos 211 1.1 christos r = uv_async_init(&loop->loop, &loop->shutdown_trigger, shutdown_cb); 212 1.1 christos UV_RUNTIME_CHECK(uv_async_init, r); 213 1.1 christos uv_handle_set_data(&loop->shutdown_trigger, loop); 214 1.1 christos 215 1.1 christos r = uv_async_init(&loop->loop, &loop->async_trigger, isc__async_cb); 216 1.1 christos UV_RUNTIME_CHECK(uv_async_init, r); 217 1.1 christos uv_handle_set_data(&loop->async_trigger, loop); 218 1.1 christos 219 1.1 christos r = uv_idle_init(&loop->loop, &loop->run_trigger); 220 1.1 christos UV_RUNTIME_CHECK(uv_idle_init, r); 221 1.1 christos uv_handle_set_data(&loop->run_trigger, loop); 222 1.1 christos 223 1.1 christos r = uv_async_init(&loop->loop, &loop->destroy_trigger, destroy_cb); 224 1.1 christos UV_RUNTIME_CHECK(uv_async_init, r); 225 1.1 christos uv_handle_set_data(&loop->destroy_trigger, loop); 226 1.1 christos 227 1.1 christos r = uv_prepare_init(&loop->loop, &loop->quiescent); 228 1.1 christos UV_RUNTIME_CHECK(uv_prepare_init, r); 229 1.1 christos uv_handle_set_data(&loop->quiescent, loop); 230 1.1 christos 231 1.1 christos char name[16]; 232 1.1 christos snprintf(name, sizeof(name), "%s-%08" PRIx32, kind, tid); 233 1.1 christos isc_mem_create(&loop->mctx); 234 1.1 christos isc_mem_setname(loop->mctx, name); 235 1.1 christos 236 1.1 christos isc_refcount_init(&loop->references, 1); 237 1.1 christos 238 1.1 christos loop->magic = LOOP_MAGIC; 239 1.1 christos } 240 1.1 christos 241 1.1 christos static void 242 1.1 christos quiescent_cb(uv_prepare_t *handle) { 243 1.1 christos UNUSED(handle); 244 1.1 christos 245 1.1 christos #if defined(RCU_QSBR) 246 1.1 christos /* safe memory reclamation */ 247 1.1 christos rcu_quiescent_state(); 248 1.1 christos 249 1.1 christos /* mark the thread offline when polling */ 250 1.1 christos rcu_thread_offline(); 251 1.1 christos #else 252 1.1 christos INSIST(!rcu_read_ongoing()); 253 1.1 christos #endif 254 1.1 christos } 255 1.1 christos 256 1.1 christos static void 257 1.1 christos helper_close(isc_loop_t *loop) { 258 1.1 christos int r = uv_loop_close(&loop->loop); 259 1.1 christos UV_RUNTIME_CHECK(uv_loop_close, r); 260 1.1 christos 261 1.1 christos INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail)); 262 1.1 christos 263 1.1 christos isc_mem_detach(&loop->mctx); 264 1.1 christos } 265 1.1 christos 266 1.1 christos static void 267 1.1 christos loop_close(isc_loop_t *loop) { 268 1.1 christos int r = uv_loop_close(&loop->loop); 269 1.1 christos UV_RUNTIME_CHECK(uv_loop_close, r); 270 1.1 christos 271 1.1 christos INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail)); 272 1.1 christos INSIST(ISC_LIST_EMPTY(loop->run_jobs)); 273 1.1 christos 274 1.1 christos loop->magic = 0; 275 1.1 christos 276 1.1 christos isc_mem_detach(&loop->mctx); 277 1.1 christos } 278 1.1 christos 279 1.1 christos static void * 280 1.1 christos helper_thread(void *arg) { 281 1.1 christos isc_loop_t *helper = (isc_loop_t *)arg; 282 1.1 christos 283 1.1 christos int r = uv_prepare_start(&helper->quiescent, quiescent_cb); 284 1.1 christos UV_RUNTIME_CHECK(uv_prepare_start, r); 285 1.1 christos 286 1.1 christos isc_barrier_wait(&helper->loopmgr->starting); 287 1.1 christos 288 1.1 christos r = uv_run(&helper->loop, UV_RUN_DEFAULT); 289 1.1 christos UV_RUNTIME_CHECK(uv_run, r); 290 1.1 christos 291 1.1 christos /* Invalidate the helper early */ 292 1.1 christos helper->magic = 0; 293 1.1 christos 294 1.1 christos isc_barrier_wait(&helper->loopmgr->stopping); 295 1.1 christos 296 1.1 christos return NULL; 297 1.1 christos } 298 1.1 christos 299 1.1 christos static void * 300 1.1 christos loop_thread(void *arg) { 301 1.1 christos isc_loop_t *loop = (isc_loop_t *)arg; 302 1.1 christos isc_loopmgr_t *loopmgr = loop->loopmgr; 303 1.1 christos isc_loop_t *helper = &loopmgr->helpers[loop->tid]; 304 1.1 christos char name[32]; 305 1.1 christos /* Initialize the thread_local variables*/ 306 1.1 christos 307 1.1 christos REQUIRE(isc__loop_local == NULL || isc__loop_local == loop); 308 1.1 christos isc__loop_local = loop; 309 1.1 christos 310 1.1 christos isc__tid_init(loop->tid); 311 1.1 christos 312 1.1 christos /* Start the helper thread */ 313 1.1 christos isc_thread_create(helper_thread, helper, &helper->thread); 314 1.1 christos snprintf(name, sizeof(name), "isc-helper-%04" PRIu32, loop->tid); 315 1.1 christos isc_thread_setname(helper->thread, name); 316 1.1 christos 317 1.1 christos int r = uv_prepare_start(&loop->quiescent, quiescent_cb); 318 1.1 christos UV_RUNTIME_CHECK(uv_prepare_start, r); 319 1.1 christos 320 1.1 christos isc_barrier_wait(&loopmgr->starting); 321 1.1 christos 322 1.1 christos enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( 323 1.1 christos &loop->async_jobs.head, &loop->async_jobs.tail, 324 1.1 christos &loop->setup_jobs.head, &loop->setup_jobs.tail); 325 1.1 christos INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); 326 1.1 christos 327 1.1 christos r = uv_async_send(&loop->async_trigger); 328 1.1 christos UV_RUNTIME_CHECK(uv_async_send, r); 329 1.1 christos 330 1.1 christos r = uv_run(&loop->loop, UV_RUN_DEFAULT); 331 1.1 christos UV_RUNTIME_CHECK(uv_run, r); 332 1.1 christos 333 1.1 christos isc__loop_local = NULL; 334 1.1 christos 335 1.1 christos /* Invalidate the loop early */ 336 1.1 christos loop->magic = 0; 337 1.1 christos 338 1.1 christos /* Shutdown the helper thread */ 339 1.1 christos r = uv_async_send(&helper->shutdown_trigger); 340 1.1 christos UV_RUNTIME_CHECK(uv_async_send, r); 341 1.1 christos 342 1.1 christos isc_barrier_wait(&loopmgr->stopping); 343 1.1 christos 344 1.1 christos return NULL; 345 1.1 christos } 346 1.1 christos 347 1.1 christos /** 348 1.1 christos * Public 349 1.1 christos */ 350 1.1 christos 351 1.1 christos static void 352 1.1 christos threadpool_initialize(uint32_t workers) { 353 1.1 christos char buf[11]; 354 1.1 christos int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf, 355 1.1 christos &(size_t){ sizeof(buf) }); 356 1.1 christos if (r == UV_ENOENT) { 357 1.1 christos snprintf(buf, sizeof(buf), "%" PRIu32, workers); 358 1.1 christos uv_os_setenv("UV_THREADPOOL_SIZE", buf); 359 1.1 christos } 360 1.1 christos } 361 1.1 christos 362 1.1 christos static void 363 1.1 christos loop_destroy(isc_loop_t *loop) { 364 1.1 christos int r = uv_async_send(&loop->destroy_trigger); 365 1.1 christos UV_RUNTIME_CHECK(uv_async_send, r); 366 1.1 christos } 367 1.1 christos 368 1.1 christos #if ISC_LOOP_TRACE 369 1.1 christos ISC_REFCOUNT_TRACE_IMPL(isc_loop, loop_destroy) 370 1.1 christos #else 371 1.1 christos ISC_REFCOUNT_IMPL(isc_loop, loop_destroy); 372 1.1 christos #endif 373 1.1 christos 374 1.1 christos void 375 1.1 christos isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) { 376 1.1 christos isc_loopmgr_t *loopmgr = NULL; 377 1.1 christos 378 1.1 christos REQUIRE(loopmgrp != NULL && *loopmgrp == NULL); 379 1.1 christos REQUIRE(nloops > 0); 380 1.1 christos 381 1.1 christos threadpool_initialize(nloops); 382 1.1 christos isc__tid_initcount(nloops); 383 1.1 christos 384 1.1 christos loopmgr = isc_mem_get(mctx, sizeof(*loopmgr)); 385 1.1 christos *loopmgr = (isc_loopmgr_t){ 386 1.1 christos .nloops = nloops, 387 1.1 christos }; 388 1.1 christos 389 1.1 christos isc_mem_attach(mctx, &loopmgr->mctx); 390 1.1 christos 391 1.1 christos /* We need to double the number for loops and helpers */ 392 1.1 christos isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2); 393 1.1 christos isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2); 394 1.1 christos isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2); 395 1.1 christos isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2); 396 1.1 christos 397 1.1 christos loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, 398 1.1 christos sizeof(loopmgr->loops[0])); 399 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 400 1.1 christos isc_loop_t *loop = &loopmgr->loops[i]; 401 1.1 christos loop_init(loop, loopmgr, i, "loop"); 402 1.1 christos } 403 1.1 christos 404 1.1 christos loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, 405 1.1 christos sizeof(loopmgr->helpers[0])); 406 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 407 1.1 christos isc_loop_t *loop = &loopmgr->helpers[i]; 408 1.1 christos loop_init(loop, loopmgr, i, "helper"); 409 1.1 christos } 410 1.1 christos 411 1.1 christos loopmgr->sigint = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr, 412 1.1 christos SIGINT); 413 1.1 christos loopmgr->sigterm = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr, 414 1.1 christos SIGTERM); 415 1.1 christos 416 1.1 christos isc_signal_start(loopmgr->sigint); 417 1.1 christos isc_signal_start(loopmgr->sigterm); 418 1.1 christos 419 1.1 christos loopmgr->magic = LOOPMGR_MAGIC; 420 1.1 christos 421 1.1 christos *loopmgrp = loopmgr; 422 1.1 christos } 423 1.1 christos 424 1.1 christos isc_job_t * 425 1.1 christos isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { 426 1.1 christos REQUIRE(VALID_LOOP(loop)); 427 1.1 christos REQUIRE(cb != NULL); 428 1.1 christos 429 1.1 christos isc_loopmgr_t *loopmgr = loop->loopmgr; 430 1.1 christos isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); 431 1.1 christos *job = (isc_job_t){ 432 1.1 christos .cb = cb, 433 1.1 christos .cbarg = cbarg, 434 1.1 christos }; 435 1.1 christos 436 1.1 christos cds_wfcq_node_init(&job->wfcq_node); 437 1.1 christos 438 1.1 christos REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || 439 1.1 christos atomic_load(&loopmgr->paused)); 440 1.1 christos 441 1.1 christos cds_wfcq_enqueue(&loop->setup_jobs.head, &loop->setup_jobs.tail, 442 1.1 christos &job->wfcq_node); 443 1.1 christos 444 1.1 christos return job; 445 1.1 christos } 446 1.1 christos 447 1.1 christos isc_job_t * 448 1.1 christos isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { 449 1.1 christos REQUIRE(VALID_LOOP(loop)); 450 1.1 christos 451 1.1 christos isc_loopmgr_t *loopmgr = loop->loopmgr; 452 1.1 christos isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); 453 1.1 christos *job = (isc_job_t){ 454 1.1 christos .cb = cb, 455 1.1 christos .cbarg = cbarg, 456 1.1 christos }; 457 1.1 christos cds_wfcq_node_init(&job->wfcq_node); 458 1.1 christos 459 1.1 christos REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || 460 1.1 christos atomic_load(&loopmgr->paused)); 461 1.1 christos 462 1.1 christos cds_wfcq_enqueue(&loop->teardown_jobs.head, &loop->teardown_jobs.tail, 463 1.1 christos &job->wfcq_node); 464 1.1 christos 465 1.1 christos return job; 466 1.1 christos } 467 1.1 christos 468 1.1 christos void 469 1.1 christos isc_loopmgr_setup(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg) { 470 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 471 1.1 christos REQUIRE(!atomic_load(&loopmgr->running) || 472 1.1 christos atomic_load(&loopmgr->paused)); 473 1.1 christos 474 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 475 1.1 christos isc_loop_t *loop = &loopmgr->loops[i]; 476 1.1 christos (void)isc_loop_setup(loop, cb, cbarg); 477 1.1 christos } 478 1.1 christos } 479 1.1 christos 480 1.1 christos void 481 1.1 christos isc_loopmgr_teardown(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg) { 482 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 483 1.1 christos REQUIRE(!atomic_load(&loopmgr->running) || 484 1.1 christos atomic_load(&loopmgr->paused)); 485 1.1 christos 486 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 487 1.1 christos isc_loop_t *loop = &loopmgr->loops[i]; 488 1.1 christos (void)isc_loop_teardown(loop, cb, cbarg); 489 1.1 christos } 490 1.1 christos } 491 1.1 christos 492 1.1 christos void 493 1.1 christos isc_loopmgr_run(isc_loopmgr_t *loopmgr) { 494 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 495 1.1 christos RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running, 496 1.1 christos &(bool){ false }, true)); 497 1.1 christos 498 1.1 christos /* 499 1.1 christos * Always ignore SIGPIPE. 500 1.1 christos */ 501 1.1 christos ignore_signal(SIGPIPE, SIG_IGN); 502 1.1 christos 503 1.1 christos /* 504 1.1 christos * The thread 0 is this one. 505 1.1 christos */ 506 1.1 christos for (size_t i = 1; i < loopmgr->nloops; i++) { 507 1.1 christos char name[32]; 508 1.1 christos isc_loop_t *loop = &loopmgr->loops[i]; 509 1.1 christos 510 1.1 christos isc_thread_create(loop_thread, loop, &loop->thread); 511 1.1 christos 512 1.1 christos snprintf(name, sizeof(name), "isc-loop-%04zu", i); 513 1.1 christos isc_thread_setname(loop->thread, name); 514 1.1 christos } 515 1.1 christos 516 1.1 christos isc_thread_main(loop_thread, &loopmgr->loops[0]); 517 1.1 christos } 518 1.1 christos 519 1.1 christos void 520 1.1 christos isc_loopmgr_pause(isc_loopmgr_t *loopmgr) { 521 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 522 1.1 christos REQUIRE(isc_tid() != ISC_TID_UNKNOWN); 523 1.1 christos 524 1.1 christos if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 525 1.1 christos isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 526 1.1 christos ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 527 1.1 christos "loop exclusive mode: starting"); 528 1.1 christos } 529 1.1 christos 530 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 531 1.1 christos isc_loop_t *helper = &loopmgr->helpers[i]; 532 1.1 christos 533 1.1 christos int r = uv_async_send(&helper->pause_trigger); 534 1.1 christos UV_RUNTIME_CHECK(uv_async_send, r); 535 1.1 christos } 536 1.1 christos 537 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 538 1.1 christos isc_loop_t *loop = &loopmgr->loops[i]; 539 1.1 christos 540 1.1 christos /* Skip current loop */ 541 1.1 christos if (i == isc_tid()) { 542 1.1 christos continue; 543 1.1 christos } 544 1.1 christos 545 1.1 christos int r = uv_async_send(&loop->pause_trigger); 546 1.1 christos UV_RUNTIME_CHECK(uv_async_send, r); 547 1.1 christos } 548 1.1 christos 549 1.1 christos RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->paused, 550 1.1 christos &(bool){ false }, true)); 551 1.1 christos pause_loop(CURRENT_LOOP(loopmgr)); 552 1.1 christos 553 1.1 christos if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 554 1.1 christos isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 555 1.1 christos ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 556 1.1 christos "loop exclusive mode: started"); 557 1.1 christos } 558 1.1 christos } 559 1.1 christos 560 1.1 christos void 561 1.1 christos isc_loopmgr_resume(isc_loopmgr_t *loopmgr) { 562 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 563 1.1 christos 564 1.1 christos if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 565 1.1 christos isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 566 1.1 christos ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 567 1.1 christos "loop exclusive mode: ending"); 568 1.1 christos } 569 1.1 christos 570 1.1 christos RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->paused, 571 1.1 christos &(bool){ true }, false)); 572 1.1 christos resume_loop(CURRENT_LOOP(loopmgr)); 573 1.1 christos 574 1.1 christos if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 575 1.1 christos isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 576 1.1 christos ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 577 1.1 christos "loop exclusive mode: ended"); 578 1.1 christos } 579 1.1 christos } 580 1.1 christos 581 1.1 christos void 582 1.1 christos isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) { 583 1.1 christos isc_loopmgr_t *loopmgr = NULL; 584 1.1 christos 585 1.1 christos REQUIRE(loopmgrp != NULL); 586 1.1 christos REQUIRE(VALID_LOOPMGR(*loopmgrp)); 587 1.1 christos 588 1.1 christos loopmgr = *loopmgrp; 589 1.1 christos *loopmgrp = NULL; 590 1.1 christos 591 1.1 christos RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running, 592 1.1 christos &(bool){ true }, false)); 593 1.1 christos 594 1.1 christos /* Wait for all helpers to finish */ 595 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 596 1.1 christos isc_loop_t *helper = &loopmgr->helpers[i]; 597 1.1 christos isc_thread_join(helper->thread, NULL); 598 1.1 christos } 599 1.1 christos 600 1.1 christos /* First wait for all loops to finish */ 601 1.1 christos for (size_t i = 1; i < loopmgr->nloops; i++) { 602 1.1 christos isc_loop_t *loop = &loopmgr->loops[i]; 603 1.1 christos isc_thread_join(loop->thread, NULL); 604 1.1 christos } 605 1.1 christos 606 1.1 christos loopmgr->magic = 0; 607 1.1 christos 608 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 609 1.1 christos isc_loop_t *helper = &loopmgr->helpers[i]; 610 1.1 christos helper_close(helper); 611 1.1 christos } 612 1.1 christos isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops, 613 1.1 christos sizeof(loopmgr->helpers[0])); 614 1.1 christos 615 1.1 christos for (size_t i = 0; i < loopmgr->nloops; i++) { 616 1.1 christos isc_loop_t *loop = &loopmgr->loops[i]; 617 1.1 christos loop_close(loop); 618 1.1 christos } 619 1.1 christos isc_mem_cput(loopmgr->mctx, loopmgr->loops, loopmgr->nloops, 620 1.1 christos sizeof(loopmgr->loops[0])); 621 1.1 christos 622 1.1 christos isc_barrier_destroy(&loopmgr->starting); 623 1.1 christos isc_barrier_destroy(&loopmgr->stopping); 624 1.1 christos isc_barrier_destroy(&loopmgr->resuming); 625 1.1 christos isc_barrier_destroy(&loopmgr->pausing); 626 1.1 christos 627 1.1 christos isc_mem_putanddetach(&loopmgr->mctx, loopmgr, sizeof(*loopmgr)); 628 1.1 christos } 629 1.1 christos 630 1.1 christos uint32_t 631 1.1 christos isc_loopmgr_nloops(isc_loopmgr_t *loopmgr) { 632 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 633 1.1 christos 634 1.1 christos return loopmgr->nloops; 635 1.1 christos } 636 1.1 christos 637 1.1 christos isc_mem_t * 638 1.1 christos isc_loop_getmctx(isc_loop_t *loop) { 639 1.1 christos REQUIRE(VALID_LOOP(loop)); 640 1.1 christos 641 1.1 christos return loop->mctx; 642 1.1 christos } 643 1.1 christos 644 1.1 christos isc_loop_t * 645 1.1 christos isc_loop_main(isc_loopmgr_t *loopmgr) { 646 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 647 1.1 christos 648 1.1 christos return DEFAULT_LOOP(loopmgr); 649 1.1 christos } 650 1.1 christos 651 1.1 christos isc_loop_t * 652 1.1 christos isc_loop_get(isc_loopmgr_t *loopmgr, uint32_t tid) { 653 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 654 1.1 christos REQUIRE(tid < loopmgr->nloops); 655 1.1 christos 656 1.1 christos return LOOP(loopmgr, tid); 657 1.1 christos } 658 1.1 christos 659 1.1 christos void 660 1.1 christos isc_loopmgr_blocking(isc_loopmgr_t *loopmgr) { 661 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 662 1.1 christos 663 1.1 christos isc_signal_stop(loopmgr->sigterm); 664 1.1 christos isc_signal_stop(loopmgr->sigint); 665 1.1 christos } 666 1.1 christos 667 1.1 christos void 668 1.1 christos isc_loopmgr_nonblocking(isc_loopmgr_t *loopmgr) { 669 1.1 christos REQUIRE(VALID_LOOPMGR(loopmgr)); 670 1.1 christos 671 1.1 christos isc_signal_start(loopmgr->sigint); 672 1.1 christos isc_signal_start(loopmgr->sigterm); 673 1.1 christos } 674 1.1 christos 675 1.1 christos isc_loopmgr_t * 676 1.1 christos isc_loop_getloopmgr(isc_loop_t *loop) { 677 1.1 christos REQUIRE(VALID_LOOP(loop)); 678 1.1 christos 679 1.1 christos return loop->loopmgr; 680 1.1 christos } 681 1.1 christos 682 1.1 christos isc_time_t 683 1.1 christos isc_loop_now(isc_loop_t *loop) { 684 1.1 christos REQUIRE(VALID_LOOP(loop)); 685 1.1 christos 686 1.1 christos uint64_t msec = uv_now(&loop->loop); 687 1.1 christos isc_time_t t = { 688 1.1 christos .seconds = msec / MS_PER_SEC, 689 1.1 christos .nanoseconds = (msec % MS_PER_SEC) * NS_PER_MS, 690 1.1 christos }; 691 1.1 christos 692 1.1 christos return t; 693 1.1 christos } 694 1.1 christos 695 1.1 christos bool 696 1.1 christos isc_loop_shuttingdown(isc_loop_t *loop) { 697 1.1 christos REQUIRE(VALID_LOOP(loop)); 698 1.1 christos REQUIRE(loop->tid == isc_tid()); 699 1.1 christos 700 1.1 christos return loop->shuttingdown; 701 1.1 christos } 702