1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "uv-common.h" 23 24 #if !defined(_WIN32) 25 # include "unix/internal.h" 26 #endif 27 28 #include <stdlib.h> 29 30 #define MAX_THREADPOOL_SIZE 1024 31 32 static uv_once_t once = UV_ONCE_INIT; 33 static uv_cond_t cond; 34 static uv_mutex_t mutex; 35 static unsigned int idle_threads; 36 static unsigned int slow_io_work_running; 37 static unsigned int nthreads; 38 static uv_thread_t* threads; 39 static uv_thread_t default_threads[4]; 40 static struct uv__queue exit_message; 41 static struct uv__queue wq; 42 static struct uv__queue run_slow_work_message; 43 static struct uv__queue slow_io_pending_wq; 44 45 static unsigned int slow_work_thread_threshold(void) { 46 return (nthreads + 1) / 2; 47 } 48 49 static void uv__cancelled(struct uv__work* w) { 50 abort(); 51 } 52 53 54 /* To avoid deadlock with uv_cancel() it's crucial that the worker 55 * never holds the global mutex and the loop-local mutex at the same time. 56 */ 57 static void worker(void* arg) { 58 struct uv__work* w; 59 struct uv__queue* q; 60 int is_slow_work; 61 62 uv_thread_setname("libuv-worker"); 63 uv_sem_post((uv_sem_t*) arg); 64 arg = NULL; 65 66 uv_mutex_lock(&mutex); 67 for (;;) { 68 /* `mutex` should always be locked at this point. */ 69 70 /* Keep waiting while either no work is present or only slow I/O 71 and we're at the threshold for that. */ 72 while (uv__queue_empty(&wq) || 73 (uv__queue_head(&wq) == &run_slow_work_message && 74 uv__queue_next(&run_slow_work_message) == &wq && 75 slow_io_work_running >= slow_work_thread_threshold())) { 76 idle_threads += 1; 77 uv_cond_wait(&cond, &mutex); 78 idle_threads -= 1; 79 } 80 81 q = uv__queue_head(&wq); 82 if (q == &exit_message) { 83 uv_cond_signal(&cond); 84 uv_mutex_unlock(&mutex); 85 break; 86 } 87 88 uv__queue_remove(q); 89 uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */ 90 91 is_slow_work = 0; 92 if (q == &run_slow_work_message) { 93 /* If we're at the slow I/O threshold, re-schedule until after all 94 other work in the queue is done. */ 95 if (slow_io_work_running >= slow_work_thread_threshold()) { 96 uv__queue_insert_tail(&wq, q); 97 continue; 98 } 99 100 /* If we encountered a request to run slow I/O work but there is none 101 to run, that means it's cancelled => Start over. */ 102 if (uv__queue_empty(&slow_io_pending_wq)) 103 continue; 104 105 is_slow_work = 1; 106 slow_io_work_running++; 107 108 q = uv__queue_head(&slow_io_pending_wq); 109 uv__queue_remove(q); 110 uv__queue_init(q); 111 112 /* If there is more slow I/O work, schedule it to be run as well. */ 113 if (!uv__queue_empty(&slow_io_pending_wq)) { 114 uv__queue_insert_tail(&wq, &run_slow_work_message); 115 if (idle_threads > 0) 116 uv_cond_signal(&cond); 117 } 118 } 119 120 uv_mutex_unlock(&mutex); 121 122 w = uv__queue_data(q, struct uv__work, wq); 123 w->work(w); 124 125 uv_mutex_lock(&w->loop->wq_mutex); 126 w->work = NULL; /* Signal uv_cancel() that the work req is done 127 executing. */ 128 uv__queue_insert_tail(&w->loop->wq, &w->wq); 129 uv_async_send(&w->loop->wq_async); 130 uv_mutex_unlock(&w->loop->wq_mutex); 131 132 /* Lock `mutex` since that is expected at the start of the next 133 * iteration. */ 134 uv_mutex_lock(&mutex); 135 if (is_slow_work) { 136 /* `slow_io_work_running` is protected by `mutex`. */ 137 slow_io_work_running--; 138 } 139 } 140 } 141 142 143 static void post(struct uv__queue* q, enum uv__work_kind kind) { 144 uv_mutex_lock(&mutex); 145 if (kind == UV__WORK_SLOW_IO) { 146 /* Insert into a separate queue. */ 147 uv__queue_insert_tail(&slow_io_pending_wq, q); 148 if (!uv__queue_empty(&run_slow_work_message)) { 149 /* Running slow I/O tasks is already scheduled => Nothing to do here. 150 The worker that runs said other task will schedule this one as well. */ 151 uv_mutex_unlock(&mutex); 152 return; 153 } 154 q = &run_slow_work_message; 155 } 156 157 uv__queue_insert_tail(&wq, q); 158 if (idle_threads > 0) 159 uv_cond_signal(&cond); 160 uv_mutex_unlock(&mutex); 161 } 162 163 164 #ifdef __MVS__ 165 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */ 166 __attribute__((destructor)) 167 #endif 168 void uv__threadpool_cleanup(void) { 169 unsigned int i; 170 171 if (nthreads == 0) 172 return; 173 174 #ifndef __MVS__ 175 /* TODO(gabylb) - zos: revisit when Woz compiler is available. */ 176 post(&exit_message, UV__WORK_CPU); 177 #endif 178 179 for (i = 0; i < nthreads; i++) 180 if (uv_thread_join(threads + i)) 181 abort(); 182 183 if (threads != default_threads) 184 uv__free(threads); 185 186 uv_mutex_destroy(&mutex); 187 uv_cond_destroy(&cond); 188 189 threads = NULL; 190 nthreads = 0; 191 } 192 193 194 static void init_threads(void) { 195 uv_thread_options_t config; 196 unsigned int i; 197 const char* val; 198 uv_sem_t sem; 199 200 nthreads = ARRAY_SIZE(default_threads); 201 val = getenv("UV_THREADPOOL_SIZE"); 202 if (val != NULL) 203 nthreads = atoi(val); 204 if (nthreads == 0) 205 nthreads = 1; 206 if (nthreads > MAX_THREADPOOL_SIZE) 207 nthreads = MAX_THREADPOOL_SIZE; 208 209 threads = default_threads; 210 if (nthreads > ARRAY_SIZE(default_threads)) { 211 threads = uv__malloc(nthreads * sizeof(threads[0])); 212 if (threads == NULL) { 213 nthreads = ARRAY_SIZE(default_threads); 214 threads = default_threads; 215 } 216 } 217 218 if (uv_cond_init(&cond)) 219 abort(); 220 221 if (uv_mutex_init(&mutex)) 222 abort(); 223 224 uv__queue_init(&wq); 225 uv__queue_init(&slow_io_pending_wq); 226 uv__queue_init(&run_slow_work_message); 227 228 if (uv_sem_init(&sem, 0)) 229 abort(); 230 231 config.flags = UV_THREAD_HAS_STACK_SIZE; 232 config.stack_size = 8u << 20; /* 8 MB */ 233 234 for (i = 0; i < nthreads; i++) 235 if (uv_thread_create_ex(threads + i, &config, worker, &sem)) 236 abort(); 237 238 for (i = 0; i < nthreads; i++) 239 uv_sem_wait(&sem); 240 241 uv_sem_destroy(&sem); 242 } 243 244 245 #ifndef _WIN32 246 static void reset_once(void) { 247 uv_once_t child_once = UV_ONCE_INIT; 248 memcpy(&once, &child_once, sizeof(child_once)); 249 } 250 #endif 251 252 253 static void init_once(void) { 254 #ifndef _WIN32 255 /* Re-initialize the threadpool after fork. 256 * Note that this discards the global mutex and condition as well 257 * as the work queue. 258 */ 259 if (pthread_atfork(NULL, NULL, &reset_once)) 260 abort(); 261 #endif 262 init_threads(); 263 } 264 265 266 void uv__work_submit(uv_loop_t* loop, 267 struct uv__work* w, 268 enum uv__work_kind kind, 269 void (*work)(struct uv__work* w), 270 void (*done)(struct uv__work* w, int status)) { 271 uv_once(&once, init_once); 272 w->loop = loop; 273 w->work = work; 274 w->done = done; 275 post(&w->wq, kind); 276 } 277 278 279 /* TODO(bnoordhuis) teach libuv how to cancel file operations 280 * that go through io_uring instead of the thread pool. 281 */ 282 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { 283 int cancelled; 284 285 uv_once(&once, init_once); /* Ensure |mutex| is initialized. */ 286 uv_mutex_lock(&mutex); 287 uv_mutex_lock(&w->loop->wq_mutex); 288 289 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL; 290 if (cancelled) 291 uv__queue_remove(&w->wq); 292 293 uv_mutex_unlock(&w->loop->wq_mutex); 294 uv_mutex_unlock(&mutex); 295 296 if (!cancelled) 297 return UV_EBUSY; 298 299 w->work = uv__cancelled; 300 uv_mutex_lock(&loop->wq_mutex); 301 uv__queue_insert_tail(&loop->wq, &w->wq); 302 uv_async_send(&loop->wq_async); 303 uv_mutex_unlock(&loop->wq_mutex); 304 305 return 0; 306 } 307 308 309 void uv__work_done(uv_async_t* handle) { 310 struct uv__work* w; 311 uv_loop_t* loop; 312 struct uv__queue* q; 313 struct uv__queue wq; 314 int err; 315 int nevents; 316 317 loop = container_of(handle, uv_loop_t, wq_async); 318 uv_mutex_lock(&loop->wq_mutex); 319 uv__queue_move(&loop->wq, &wq); 320 uv_mutex_unlock(&loop->wq_mutex); 321 322 nevents = 0; 323 324 while (!uv__queue_empty(&wq)) { 325 q = uv__queue_head(&wq); 326 uv__queue_remove(q); 327 328 w = container_of(q, struct uv__work, wq); 329 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; 330 w->done(w, err); 331 nevents++; 332 } 333 334 /* This check accomplishes 2 things: 335 * 1. Even if the queue was empty, the call to uv__work_done() should count 336 * as an event. Which will have been added by the event loop when 337 * calling this callback. 338 * 2. Prevents accidental wrap around in case nevents == 0 events == 0. 339 */ 340 if (nevents > 1) { 341 /* Subtract 1 to counter the call to uv__work_done(). */ 342 uv__metrics_inc_events(loop, nevents - 1); 343 if (uv__get_internal_fields(loop)->current_timeout == 0) 344 uv__metrics_inc_events_waiting(loop, nevents - 1); 345 } 346 } 347 348 349 static void uv__queue_work(struct uv__work* w) { 350 uv_work_t* req = container_of(w, uv_work_t, work_req); 351 352 req->work_cb(req); 353 } 354 355 356 static void uv__queue_done(struct uv__work* w, int err) { 357 uv_work_t* req; 358 359 req = container_of(w, uv_work_t, work_req); 360 uv__req_unregister(req->loop); 361 362 if (req->after_work_cb == NULL) 363 return; 364 365 req->after_work_cb(req, err); 366 } 367 368 369 int uv_queue_work(uv_loop_t* loop, 370 uv_work_t* req, 371 uv_work_cb work_cb, 372 uv_after_work_cb after_work_cb) { 373 if (work_cb == NULL) 374 return UV_EINVAL; 375 376 uv__req_init(loop, req, UV_WORK); 377 req->loop = loop; 378 req->work_cb = work_cb; 379 req->after_work_cb = after_work_cb; 380 uv__work_submit(loop, 381 &req->work_req, 382 UV__WORK_CPU, 383 uv__queue_work, 384 uv__queue_done); 385 return 0; 386 } 387 388 389 int uv_cancel(uv_req_t* req) { 390 struct uv__work* wreq; 391 uv_loop_t* loop; 392 393 switch (req->type) { 394 case UV_FS: 395 loop = ((uv_fs_t*) req)->loop; 396 wreq = &((uv_fs_t*) req)->work_req; 397 break; 398 case UV_GETADDRINFO: 399 loop = ((uv_getaddrinfo_t*) req)->loop; 400 wreq = &((uv_getaddrinfo_t*) req)->work_req; 401 break; 402 case UV_GETNAMEINFO: 403 loop = ((uv_getnameinfo_t*) req)->loop; 404 wreq = &((uv_getnameinfo_t*) req)->work_req; 405 break; 406 case UV_RANDOM: 407 loop = ((uv_random_t*) req)->loop; 408 wreq = &((uv_random_t*) req)->work_req; 409 break; 410 case UV_WORK: 411 loop = ((uv_work_t*) req)->loop; 412 wreq = &((uv_work_t*) req)->work_req; 413 break; 414 default: 415 return UV_EINVAL; 416 } 417 418 return uv__work_cancel(loop, req, wreq); 419 } 420