Home | History | Annotate | Line # | Download | only in src
      1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2  *
      3  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4  * of this software and associated documentation files (the "Software"), to
      5  * deal in the Software without restriction, including without limitation the
      6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7  * sell copies of the Software, and to permit persons to whom the Software is
      8  * furnished to do so, subject to the following conditions:
      9  *
     10  * The above copyright notice and this permission notice shall be included in
     11  * all copies or substantial portions of the Software.
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19  * IN THE SOFTWARE.
     20  */
     21 
     22 #include "uv-common.h"
     23 
     24 #if !defined(_WIN32)
     25 # include "unix/internal.h"
     26 #endif
     27 
     28 #include <stdlib.h>
     29 
     30 #define MAX_THREADPOOL_SIZE 1024
     31 
     32 static uv_once_t once = UV_ONCE_INIT;
     33 static uv_cond_t cond;
     34 static uv_mutex_t mutex;
     35 static unsigned int idle_threads;
     36 static unsigned int slow_io_work_running;
     37 static unsigned int nthreads;
     38 static uv_thread_t* threads;
     39 static uv_thread_t default_threads[4];
     40 static struct uv__queue exit_message;
     41 static struct uv__queue wq;
     42 static struct uv__queue run_slow_work_message;
     43 static struct uv__queue slow_io_pending_wq;
     44 
     45 static unsigned int slow_work_thread_threshold(void) {
     46   return (nthreads + 1) / 2;
     47 }
     48 
     49 static void uv__cancelled(struct uv__work* w) {
     50   abort();
     51 }
     52 
     53 
     54 /* To avoid deadlock with uv_cancel() it's crucial that the worker
     55  * never holds the global mutex and the loop-local mutex at the same time.
     56  */
     57 static void worker(void* arg) {
     58   struct uv__work* w;
     59   struct uv__queue* q;
     60   int is_slow_work;
     61 
     62   uv_thread_setname("libuv-worker");
     63   uv_sem_post((uv_sem_t*) arg);
     64   arg = NULL;
     65 
     66   uv_mutex_lock(&mutex);
     67   for (;;) {
     68     /* `mutex` should always be locked at this point. */
     69 
     70     /* Keep waiting while either no work is present or only slow I/O
     71        and we're at the threshold for that. */
     72     while (uv__queue_empty(&wq) ||
     73            (uv__queue_head(&wq) == &run_slow_work_message &&
     74             uv__queue_next(&run_slow_work_message) == &wq &&
     75             slow_io_work_running >= slow_work_thread_threshold())) {
     76       idle_threads += 1;
     77       uv_cond_wait(&cond, &mutex);
     78       idle_threads -= 1;
     79     }
     80 
     81     q = uv__queue_head(&wq);
     82     if (q == &exit_message) {
     83       uv_cond_signal(&cond);
     84       uv_mutex_unlock(&mutex);
     85       break;
     86     }
     87 
     88     uv__queue_remove(q);
     89     uv__queue_init(q);  /* Signal uv_cancel() that the work req is executing. */
     90 
     91     is_slow_work = 0;
     92     if (q == &run_slow_work_message) {
     93       /* If we're at the slow I/O threshold, re-schedule until after all
     94          other work in the queue is done. */
     95       if (slow_io_work_running >= slow_work_thread_threshold()) {
     96         uv__queue_insert_tail(&wq, q);
     97         continue;
     98       }
     99 
    100       /* If we encountered a request to run slow I/O work but there is none
    101          to run, that means it's cancelled => Start over. */
    102       if (uv__queue_empty(&slow_io_pending_wq))
    103         continue;
    104 
    105       is_slow_work = 1;
    106       slow_io_work_running++;
    107 
    108       q = uv__queue_head(&slow_io_pending_wq);
    109       uv__queue_remove(q);
    110       uv__queue_init(q);
    111 
    112       /* If there is more slow I/O work, schedule it to be run as well. */
    113       if (!uv__queue_empty(&slow_io_pending_wq)) {
    114         uv__queue_insert_tail(&wq, &run_slow_work_message);
    115         if (idle_threads > 0)
    116           uv_cond_signal(&cond);
    117       }
    118     }
    119 
    120     uv_mutex_unlock(&mutex);
    121 
    122     w = uv__queue_data(q, struct uv__work, wq);
    123     w->work(w);
    124 
    125     uv_mutex_lock(&w->loop->wq_mutex);
    126     w->work = NULL;  /* Signal uv_cancel() that the work req is done
    127                         executing. */
    128     uv__queue_insert_tail(&w->loop->wq, &w->wq);
    129     uv_async_send(&w->loop->wq_async);
    130     uv_mutex_unlock(&w->loop->wq_mutex);
    131 
    132     /* Lock `mutex` since that is expected at the start of the next
    133      * iteration. */
    134     uv_mutex_lock(&mutex);
    135     if (is_slow_work) {
    136       /* `slow_io_work_running` is protected by `mutex`. */
    137       slow_io_work_running--;
    138     }
    139   }
    140 }
    141 
    142 
    143 static void post(struct uv__queue* q, enum uv__work_kind kind) {
    144   uv_mutex_lock(&mutex);
    145   if (kind == UV__WORK_SLOW_IO) {
    146     /* Insert into a separate queue. */
    147     uv__queue_insert_tail(&slow_io_pending_wq, q);
    148     if (!uv__queue_empty(&run_slow_work_message)) {
    149       /* Running slow I/O tasks is already scheduled => Nothing to do here.
    150          The worker that runs said other task will schedule this one as well. */
    151       uv_mutex_unlock(&mutex);
    152       return;
    153     }
    154     q = &run_slow_work_message;
    155   }
    156 
    157   uv__queue_insert_tail(&wq, q);
    158   if (idle_threads > 0)
    159     uv_cond_signal(&cond);
    160   uv_mutex_unlock(&mutex);
    161 }
    162 
    163 
    164 #ifdef __MVS__
    165 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */
    166 __attribute__((destructor))
    167 #endif
    168 void uv__threadpool_cleanup(void) {
    169   unsigned int i;
    170 
    171   if (nthreads == 0)
    172     return;
    173 
    174 #ifndef __MVS__
    175   /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
    176   post(&exit_message, UV__WORK_CPU);
    177 #endif
    178 
    179   for (i = 0; i < nthreads; i++)
    180     if (uv_thread_join(threads + i))
    181       abort();
    182 
    183   if (threads != default_threads)
    184     uv__free(threads);
    185 
    186   uv_mutex_destroy(&mutex);
    187   uv_cond_destroy(&cond);
    188 
    189   threads = NULL;
    190   nthreads = 0;
    191 }
    192 
    193 
    194 static void init_threads(void) {
    195   uv_thread_options_t config;
    196   unsigned int i;
    197   const char* val;
    198   uv_sem_t sem;
    199 
    200   nthreads = ARRAY_SIZE(default_threads);
    201   val = getenv("UV_THREADPOOL_SIZE");
    202   if (val != NULL)
    203     nthreads = atoi(val);
    204   if (nthreads == 0)
    205     nthreads = 1;
    206   if (nthreads > MAX_THREADPOOL_SIZE)
    207     nthreads = MAX_THREADPOOL_SIZE;
    208 
    209   threads = default_threads;
    210   if (nthreads > ARRAY_SIZE(default_threads)) {
    211     threads = uv__malloc(nthreads * sizeof(threads[0]));
    212     if (threads == NULL) {
    213       nthreads = ARRAY_SIZE(default_threads);
    214       threads = default_threads;
    215     }
    216   }
    217 
    218   if (uv_cond_init(&cond))
    219     abort();
    220 
    221   if (uv_mutex_init(&mutex))
    222     abort();
    223 
    224   uv__queue_init(&wq);
    225   uv__queue_init(&slow_io_pending_wq);
    226   uv__queue_init(&run_slow_work_message);
    227 
    228   if (uv_sem_init(&sem, 0))
    229     abort();
    230 
    231   config.flags = UV_THREAD_HAS_STACK_SIZE;
    232   config.stack_size = 8u << 20;  /* 8 MB */
    233 
    234   for (i = 0; i < nthreads; i++)
    235     if (uv_thread_create_ex(threads + i, &config, worker, &sem))
    236       abort();
    237 
    238   for (i = 0; i < nthreads; i++)
    239     uv_sem_wait(&sem);
    240 
    241   uv_sem_destroy(&sem);
    242 }
    243 
    244 
    245 #ifndef _WIN32
    246 static void reset_once(void) {
    247   uv_once_t child_once = UV_ONCE_INIT;
    248   memcpy(&once, &child_once, sizeof(child_once));
    249 }
    250 #endif
    251 
    252 
    253 static void init_once(void) {
    254 #ifndef _WIN32
    255   /* Re-initialize the threadpool after fork.
    256    * Note that this discards the global mutex and condition as well
    257    * as the work queue.
    258    */
    259   if (pthread_atfork(NULL, NULL, &reset_once))
    260     abort();
    261 #endif
    262   init_threads();
    263 }
    264 
    265 
    266 void uv__work_submit(uv_loop_t* loop,
    267                      struct uv__work* w,
    268                      enum uv__work_kind kind,
    269                      void (*work)(struct uv__work* w),
    270                      void (*done)(struct uv__work* w, int status)) {
    271   uv_once(&once, init_once);
    272   w->loop = loop;
    273   w->work = work;
    274   w->done = done;
    275   post(&w->wq, kind);
    276 }
    277 
    278 
    279 /* TODO(bnoordhuis) teach libuv how to cancel file operations
    280  * that go through io_uring instead of the thread pool.
    281  */
    282 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
    283   int cancelled;
    284 
    285   uv_once(&once, init_once);  /* Ensure |mutex| is initialized. */
    286   uv_mutex_lock(&mutex);
    287   uv_mutex_lock(&w->loop->wq_mutex);
    288 
    289   cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
    290   if (cancelled)
    291     uv__queue_remove(&w->wq);
    292 
    293   uv_mutex_unlock(&w->loop->wq_mutex);
    294   uv_mutex_unlock(&mutex);
    295 
    296   if (!cancelled)
    297     return UV_EBUSY;
    298 
    299   w->work = uv__cancelled;
    300   uv_mutex_lock(&loop->wq_mutex);
    301   uv__queue_insert_tail(&loop->wq, &w->wq);
    302   uv_async_send(&loop->wq_async);
    303   uv_mutex_unlock(&loop->wq_mutex);
    304 
    305   return 0;
    306 }
    307 
    308 
    309 void uv__work_done(uv_async_t* handle) {
    310   struct uv__work* w;
    311   uv_loop_t* loop;
    312   struct uv__queue* q;
    313   struct uv__queue wq;
    314   int err;
    315   int nevents;
    316 
    317   loop = container_of(handle, uv_loop_t, wq_async);
    318   uv_mutex_lock(&loop->wq_mutex);
    319   uv__queue_move(&loop->wq, &wq);
    320   uv_mutex_unlock(&loop->wq_mutex);
    321 
    322   nevents = 0;
    323 
    324   while (!uv__queue_empty(&wq)) {
    325     q = uv__queue_head(&wq);
    326     uv__queue_remove(q);
    327 
    328     w = container_of(q, struct uv__work, wq);
    329     err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    330     w->done(w, err);
    331     nevents++;
    332   }
    333 
    334   /* This check accomplishes 2 things:
    335    * 1. Even if the queue was empty, the call to uv__work_done() should count
    336    *    as an event. Which will have been added by the event loop when
    337    *    calling this callback.
    338    * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
    339    */
    340   if (nevents > 1) {
    341     /* Subtract 1 to counter the call to uv__work_done(). */
    342     uv__metrics_inc_events(loop, nevents - 1);
    343     if (uv__get_internal_fields(loop)->current_timeout == 0)
    344       uv__metrics_inc_events_waiting(loop, nevents - 1);
    345   }
    346 }
    347 
    348 
    349 static void uv__queue_work(struct uv__work* w) {
    350   uv_work_t* req = container_of(w, uv_work_t, work_req);
    351 
    352   req->work_cb(req);
    353 }
    354 
    355 
    356 static void uv__queue_done(struct uv__work* w, int err) {
    357   uv_work_t* req;
    358 
    359   req = container_of(w, uv_work_t, work_req);
    360   uv__req_unregister(req->loop);
    361 
    362   if (req->after_work_cb == NULL)
    363     return;
    364 
    365   req->after_work_cb(req, err);
    366 }
    367 
    368 
    369 int uv_queue_work(uv_loop_t* loop,
    370                   uv_work_t* req,
    371                   uv_work_cb work_cb,
    372                   uv_after_work_cb after_work_cb) {
    373   if (work_cb == NULL)
    374     return UV_EINVAL;
    375 
    376   uv__req_init(loop, req, UV_WORK);
    377   req->loop = loop;
    378   req->work_cb = work_cb;
    379   req->after_work_cb = after_work_cb;
    380   uv__work_submit(loop,
    381                   &req->work_req,
    382                   UV__WORK_CPU,
    383                   uv__queue_work,
    384                   uv__queue_done);
    385   return 0;
    386 }
    387 
    388 
    389 int uv_cancel(uv_req_t* req) {
    390   struct uv__work* wreq;
    391   uv_loop_t* loop;
    392 
    393   switch (req->type) {
    394   case UV_FS:
    395     loop =  ((uv_fs_t*) req)->loop;
    396     wreq = &((uv_fs_t*) req)->work_req;
    397     break;
    398   case UV_GETADDRINFO:
    399     loop =  ((uv_getaddrinfo_t*) req)->loop;
    400     wreq = &((uv_getaddrinfo_t*) req)->work_req;
    401     break;
    402   case UV_GETNAMEINFO:
    403     loop = ((uv_getnameinfo_t*) req)->loop;
    404     wreq = &((uv_getnameinfo_t*) req)->work_req;
    405     break;
    406   case UV_RANDOM:
    407     loop = ((uv_random_t*) req)->loop;
    408     wreq = &((uv_random_t*) req)->work_req;
    409     break;
    410   case UV_WORK:
    411     loop =  ((uv_work_t*) req)->loop;
    412     wreq = &((uv_work_t*) req)->work_req;
    413     break;
    414   default:
    415     return UV_EINVAL;
    416   }
    417 
    418   return uv__work_cancel(loop, req, wreq);
    419 }
    420