Home | History | Annotate | Line # | Download | only in src
      1 // SPDX-FileCopyrightText: 2010 Paul E. McKenney <paulmck (at) linux.vnet.ibm.com>
      2 // SPDX-FileCopyrightText: 2017 Mathieu Desnoyers <mathieu.desnoyers (at) efficios.com>
      3 //
      4 // SPDX-License-Identifier: LGPL-2.1-or-later
      5 
      6 /*
      7  * Userspace RCU library - Userspace workqeues
      8  */
      9 
     10 #define _LGPL_SOURCE
     11 #include <stdio.h>
     12 #include <pthread.h>
     13 #include <signal.h>
     14 #include <stdlib.h>
     15 #include <stdint.h>
     16 #include <string.h>
     17 #include <errno.h>
     18 #include <poll.h>
     19 #include <sys/time.h>
     20 #include <unistd.h>
     21 #include <sched.h>
     22 
     23 #include "compat-getcpu.h"
     24 #include <urcu/assert.h>
     25 #include <urcu/wfcqueue.h>
     26 #include <urcu/pointer.h>
     27 #include <urcu/list.h>
     28 #include <urcu/futex.h>
     29 #include <urcu/tls-compat.h>
     30 #include <urcu/ref.h>
     31 #include "urcu-die.h"
     32 
     33 #include "workqueue.h"
     34 
     35 #define SET_AFFINITY_CHECK_PERIOD		(1U << 8)	/* 256 */
     36 #define SET_AFFINITY_CHECK_PERIOD_MASK		(SET_AFFINITY_CHECK_PERIOD - 1)
     37 
     38 /* Data structure that identifies a workqueue. */
     39 
     40 struct urcu_workqueue {
     41 	/*
     42 	 * We do not align head on a different cache-line than tail
     43 	 * mainly because workqueue threads use batching ("splice") to
     44 	 * get an entire list of callbacks, which effectively empties
     45 	 * the queue, and requires to touch the tail anyway.
     46 	 */
     47 	struct cds_wfcq_tail cbs_tail;
     48 	struct cds_wfcq_head cbs_head;
     49 	unsigned long flags;
     50 	int32_t futex;
     51 	unsigned long qlen; /* maintained for debugging. */
     52 	pthread_t tid;
     53 	int cpu_affinity;
     54 	unsigned long loop_count;
     55 	void *priv;
     56 	void (*grace_period_fct)(struct urcu_workqueue *workqueue, void *priv);
     57 	void (*initialize_worker_fct)(struct urcu_workqueue *workqueue, void *priv);
     58 	void (*finalize_worker_fct)(struct urcu_workqueue *workqueue, void *priv);
     59 	void (*worker_before_pause_fct)(struct urcu_workqueue *workqueue, void *priv);
     60 	void (*worker_after_resume_fct)(struct urcu_workqueue *workqueue, void *priv);
     61 	void (*worker_before_wait_fct)(struct urcu_workqueue *workqueue, void *priv);
     62 	void (*worker_after_wake_up_fct)(struct urcu_workqueue *workqueue, void *priv);
     63 } __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
     64 
     65 struct urcu_workqueue_completion {
     66 	int barrier_count;
     67 	int32_t futex;
     68 	struct urcu_ref ref;
     69 };
     70 
     71 struct urcu_workqueue_completion_work {
     72 	struct urcu_work work;
     73 	struct urcu_workqueue_completion *completion;
     74 };
     75 
     76 /*
     77  * Periodically retry setting CPU affinity if we migrate.
     78  * Losing affinity can be caused by CPU hotunplug/hotplug, or by
     79  * cpuset(7).
     80  */
     81 #ifdef HAVE_SCHED_SETAFFINITY
     82 static int set_thread_cpu_affinity(struct urcu_workqueue *workqueue)
     83 {
     84 	cpu_set_t mask;
     85 	int ret;
     86 
     87 	if (workqueue->cpu_affinity < 0)
     88 		return 0;
     89 	if (++workqueue->loop_count & SET_AFFINITY_CHECK_PERIOD_MASK)
     90 		return 0;
     91 	if (urcu_sched_getcpu() == workqueue->cpu_affinity)
     92 		return 0;
     93 
     94 	CPU_ZERO(&mask);
     95 	CPU_SET(workqueue->cpu_affinity, &mask);
     96 	ret = sched_setaffinity(0, sizeof(mask), &mask);
     97 
     98 	/*
     99 	 * EINVAL is fine: can be caused by hotunplugged CPUs, or by
    100 	 * cpuset(7). This is why we should always retry if we detect
    101 	 * migration.
    102 	 */
    103 	if (ret && errno == EINVAL) {
    104 		ret = 0;
    105 		errno = 0;
    106 	}
    107 	return ret;
    108 }
    109 #else
    110 static int set_thread_cpu_affinity(struct urcu_workqueue *workqueue __attribute__((unused)))
    111 {
    112 	return 0;
    113 }
    114 #endif
    115 
    116 static void futex_wait(int32_t *futex)
    117 {
    118 	/* Read condition before read futex */
    119 	cmm_smp_mb();
    120 	while (uatomic_read(futex) == -1) {
    121 		if (!futex_async(futex, FUTEX_WAIT, -1, NULL, NULL, 0)) {
    122 			/*
    123 			 * Prior queued wakeups queued by unrelated code
    124 			 * using the same address can cause futex wait to
    125 			 * return 0 even through the futex value is still
    126 			 * -1 (spurious wakeups). Check the value again
    127 			 * in user-space to validate whether it really
    128 			 * differs from -1.
    129 			 */
    130 			continue;
    131 		}
    132 		switch (errno) {
    133 		case EAGAIN:
    134 			/* Value already changed. */
    135 			return;
    136 		case EINTR:
    137 			/* Retry if interrupted by signal. */
    138 			break;	/* Get out of switch. Check again. */
    139 		default:
    140 			/* Unexpected error. */
    141 			urcu_die(errno);
    142 		}
    143 	}
    144 }
    145 
    146 static void futex_wake_up(int32_t *futex)
    147 {
    148 	/* Write to condition before reading/writing futex */
    149 	cmm_smp_mb();
    150 	if (caa_unlikely(uatomic_read(futex) == -1)) {
    151 		uatomic_set(futex, 0);
    152 		if (futex_async(futex, FUTEX_WAKE, 1,
    153 				NULL, NULL, 0) < 0)
    154 			urcu_die(errno);
    155 	}
    156 }
    157 
    158 /* This is the code run by each worker thread. */
    159 
    160 static void *workqueue_thread(void *arg)
    161 {
    162 	unsigned long cbcount;
    163 	struct urcu_workqueue *workqueue = (struct urcu_workqueue *) arg;
    164 	int rt = !!(uatomic_read(&workqueue->flags) & URCU_WORKQUEUE_RT);
    165 
    166 	if (set_thread_cpu_affinity(workqueue))
    167 		urcu_die(errno);
    168 
    169 	if (workqueue->initialize_worker_fct)
    170 		workqueue->initialize_worker_fct(workqueue, workqueue->priv);
    171 
    172 	if (!rt) {
    173 		uatomic_dec(&workqueue->futex);
    174 		/* Decrement futex before reading workqueue */
    175 		cmm_smp_mb();
    176 	}
    177 	for (;;) {
    178 		struct cds_wfcq_head cbs_tmp_head;
    179 		struct cds_wfcq_tail cbs_tmp_tail;
    180 		struct cds_wfcq_node *cbs, *cbs_tmp_n;
    181 		enum cds_wfcq_ret splice_ret;
    182 
    183 		if (set_thread_cpu_affinity(workqueue))
    184 			urcu_die(errno);
    185 
    186 		if (uatomic_read(&workqueue->flags) & URCU_WORKQUEUE_PAUSE) {
    187 			/*
    188 			 * Pause requested. Become quiescent: remove
    189 			 * ourself from all global lists, and don't
    190 			 * process any callback. The callback lists may
    191 			 * still be non-empty though.
    192 			 */
    193 			if (workqueue->worker_before_pause_fct)
    194 				workqueue->worker_before_pause_fct(workqueue, workqueue->priv);
    195 			cmm_smp_mb__before_uatomic_or();
    196 			uatomic_or(&workqueue->flags, URCU_WORKQUEUE_PAUSED);
    197 			while ((uatomic_read(&workqueue->flags) & URCU_WORKQUEUE_PAUSE) != 0)
    198 				(void) poll(NULL, 0, 1);
    199 			uatomic_and(&workqueue->flags, ~URCU_WORKQUEUE_PAUSED);
    200 			cmm_smp_mb__after_uatomic_and();
    201 			if (workqueue->worker_after_resume_fct)
    202 				workqueue->worker_after_resume_fct(workqueue, workqueue->priv);
    203 		}
    204 
    205 		cds_wfcq_init(&cbs_tmp_head, &cbs_tmp_tail);
    206 		splice_ret = __cds_wfcq_splice_blocking(&cbs_tmp_head,
    207 			&cbs_tmp_tail, &workqueue->cbs_head, &workqueue->cbs_tail);
    208 		urcu_posix_assert(splice_ret != CDS_WFCQ_RET_WOULDBLOCK);
    209 		urcu_posix_assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
    210 		if (splice_ret != CDS_WFCQ_RET_SRC_EMPTY) {
    211 			if (workqueue->grace_period_fct)
    212 				workqueue->grace_period_fct(workqueue, workqueue->priv);
    213 			cbcount = 0;
    214 			__cds_wfcq_for_each_blocking_safe(&cbs_tmp_head,
    215 					&cbs_tmp_tail, cbs, cbs_tmp_n) {
    216 				struct urcu_work *uwp;
    217 
    218 				uwp = caa_container_of(cbs,
    219 					struct urcu_work, next);
    220 				uwp->func(uwp);
    221 				cbcount++;
    222 			}
    223 			uatomic_sub(&workqueue->qlen, cbcount);
    224 		}
    225 		if (uatomic_read(&workqueue->flags) & URCU_WORKQUEUE_STOP)
    226 			break;
    227 		if (workqueue->worker_before_wait_fct)
    228 			workqueue->worker_before_wait_fct(workqueue, workqueue->priv);
    229 		if (!rt) {
    230 			if (cds_wfcq_empty(&workqueue->cbs_head,
    231 					&workqueue->cbs_tail)) {
    232 				futex_wait(&workqueue->futex);
    233 				uatomic_dec(&workqueue->futex);
    234 				/*
    235 				 * Decrement futex before reading
    236 				 * urcu_work list.
    237 				 */
    238 				cmm_smp_mb();
    239 			}
    240 		} else {
    241 			if (cds_wfcq_empty(&workqueue->cbs_head,
    242 					&workqueue->cbs_tail)) {
    243 				(void) poll(NULL, 0, 10);
    244 			}
    245 		}
    246 		if (workqueue->worker_after_wake_up_fct)
    247 			workqueue->worker_after_wake_up_fct(workqueue, workqueue->priv);
    248 	}
    249 	if (!rt) {
    250 		/*
    251 		 * Read urcu_work list before write futex.
    252 		 */
    253 		cmm_smp_mb();
    254 		uatomic_set(&workqueue->futex, 0);
    255 	}
    256 	if (workqueue->finalize_worker_fct)
    257 		workqueue->finalize_worker_fct(workqueue, workqueue->priv);
    258 	return NULL;
    259 }
    260 
    261 struct urcu_workqueue *urcu_workqueue_create(unsigned long flags,
    262 		int cpu_affinity, void *priv,
    263 		void (*grace_period_fct)(struct urcu_workqueue *workqueue, void *priv),
    264 		void (*initialize_worker_fct)(struct urcu_workqueue *workqueue, void *priv),
    265 		void (*finalize_worker_fct)(struct urcu_workqueue *workqueue, void *priv),
    266 		void (*worker_before_wait_fct)(struct urcu_workqueue *workqueue, void *priv),
    267 		void (*worker_after_wake_up_fct)(struct urcu_workqueue *workqueue, void *priv),
    268 		void (*worker_before_pause_fct)(struct urcu_workqueue *workqueue, void *priv),
    269 		void (*worker_after_resume_fct)(struct urcu_workqueue *workqueue, void *priv))
    270 {
    271 	struct urcu_workqueue *workqueue;
    272 	int ret;
    273 	sigset_t newmask, oldmask;
    274 
    275 	workqueue = malloc(sizeof(*workqueue));
    276 	if (workqueue == NULL)
    277 		urcu_die(errno);
    278 	memset(workqueue, '\0', sizeof(*workqueue));
    279 	cds_wfcq_init(&workqueue->cbs_head, &workqueue->cbs_tail);
    280 	workqueue->qlen = 0;
    281 	workqueue->futex = 0;
    282 	workqueue->flags = flags;
    283 	workqueue->priv = priv;
    284 	workqueue->grace_period_fct = grace_period_fct;
    285 	workqueue->initialize_worker_fct = initialize_worker_fct;
    286 	workqueue->finalize_worker_fct = finalize_worker_fct;
    287 	workqueue->worker_before_wait_fct = worker_before_wait_fct;
    288 	workqueue->worker_after_wake_up_fct = worker_after_wake_up_fct;
    289 	workqueue->worker_before_pause_fct = worker_before_pause_fct;
    290 	workqueue->worker_after_resume_fct = worker_after_resume_fct;
    291 	workqueue->cpu_affinity = cpu_affinity;
    292 	workqueue->loop_count = 0;
    293 	cmm_smp_mb();  /* Structure initialized before pointer is planted. */
    294 
    295 	ret = sigfillset(&newmask);
    296 	urcu_posix_assert(!ret);
    297 	ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
    298 	urcu_posix_assert(!ret);
    299 
    300 	ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue);
    301 	if (ret) {
    302 		urcu_die(ret);
    303 	}
    304 
    305 	ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
    306 	urcu_posix_assert(!ret);
    307 
    308 	return workqueue;
    309 }
    310 
    311 static void wake_worker_thread(struct urcu_workqueue *workqueue)
    312 {
    313 	if (!(_CMM_LOAD_SHARED(workqueue->flags) & URCU_WORKQUEUE_RT))
    314 		futex_wake_up(&workqueue->futex);
    315 }
    316 
    317 static int urcu_workqueue_destroy_worker(struct urcu_workqueue *workqueue)
    318 {
    319 	int ret;
    320 	void *retval;
    321 
    322 	uatomic_or(&workqueue->flags, URCU_WORKQUEUE_STOP);
    323 	wake_worker_thread(workqueue);
    324 
    325 	ret = pthread_join(workqueue->tid, &retval);
    326 	if (ret) {
    327 		urcu_die(ret);
    328 	}
    329 	if (retval != NULL) {
    330 		urcu_die(EINVAL);
    331 	}
    332 	workqueue->flags &= ~URCU_WORKQUEUE_STOP;
    333 	workqueue->tid = 0;
    334 	return 0;
    335 }
    336 
    337 void urcu_workqueue_destroy(struct urcu_workqueue *workqueue)
    338 {
    339 	if (workqueue == NULL) {
    340 		return;
    341 	}
    342 	if (urcu_workqueue_destroy_worker(workqueue)) {
    343 		urcu_die(errno);
    344 	}
    345 	urcu_posix_assert(cds_wfcq_empty(&workqueue->cbs_head, &workqueue->cbs_tail));
    346 	free(workqueue);
    347 }
    348 
    349 void urcu_workqueue_queue_work(struct urcu_workqueue *workqueue,
    350 		      struct urcu_work *work,
    351 		      void (*func)(struct urcu_work *work))
    352 {
    353 	cds_wfcq_node_init(&work->next);
    354 	work->func = func;
    355 	cds_wfcq_enqueue(&workqueue->cbs_head, &workqueue->cbs_tail, &work->next);
    356 	uatomic_inc(&workqueue->qlen);
    357 	wake_worker_thread(workqueue);
    358 }
    359 
    360 static
    361 void free_completion(struct urcu_ref *ref)
    362 {
    363 	struct urcu_workqueue_completion *completion;
    364 
    365 	completion = caa_container_of(ref, struct urcu_workqueue_completion, ref);
    366 	free(completion);
    367 }
    368 
    369 static
    370 void _urcu_workqueue_wait_complete(struct urcu_work *work)
    371 {
    372 	struct urcu_workqueue_completion_work *completion_work;
    373 	struct urcu_workqueue_completion *completion;
    374 
    375 	completion_work = caa_container_of(work, struct urcu_workqueue_completion_work, work);
    376 	completion = completion_work->completion;
    377 	if (!uatomic_sub_return(&completion->barrier_count, 1))
    378 		futex_wake_up(&completion->futex);
    379 	urcu_ref_put(&completion->ref, free_completion);
    380 	free(completion_work);
    381 }
    382 
    383 struct urcu_workqueue_completion *urcu_workqueue_create_completion(void)
    384 {
    385 	struct urcu_workqueue_completion *completion;
    386 
    387 	completion = calloc(1, sizeof(*completion));
    388 	if (!completion)
    389 		urcu_die(errno);
    390 	urcu_ref_set(&completion->ref, 1);
    391 	completion->barrier_count = 0;
    392 	return completion;
    393 }
    394 
    395 void urcu_workqueue_destroy_completion(struct urcu_workqueue_completion *completion)
    396 {
    397 	urcu_ref_put(&completion->ref, free_completion);
    398 }
    399 
    400 void urcu_workqueue_wait_completion(struct urcu_workqueue_completion *completion)
    401 {
    402 	/* Wait for them */
    403 	for (;;) {
    404 		uatomic_dec(&completion->futex);
    405 		/* Decrement futex before reading barrier_count */
    406 		cmm_smp_mb();
    407 		if (!uatomic_read(&completion->barrier_count))
    408 			break;
    409 		futex_wait(&completion->futex);
    410 	}
    411 }
    412 
    413 void urcu_workqueue_queue_completion(struct urcu_workqueue *workqueue,
    414 		struct urcu_workqueue_completion *completion)
    415 {
    416 	struct urcu_workqueue_completion_work *work;
    417 
    418 	work = calloc(1, sizeof(*work));
    419 	if (!work)
    420 		urcu_die(errno);
    421 	work->completion = completion;
    422 	urcu_ref_get(&completion->ref);
    423 	uatomic_inc(&completion->barrier_count);
    424 	urcu_workqueue_queue_work(workqueue, &work->work, _urcu_workqueue_wait_complete);
    425 }
    426 
    427 /*
    428  * Wait for all in-flight work to complete execution.
    429  */
    430 void urcu_workqueue_flush_queued_work(struct urcu_workqueue *workqueue)
    431 {
    432 	struct urcu_workqueue_completion *completion;
    433 
    434 	completion = urcu_workqueue_create_completion();
    435 	if (!completion)
    436 		urcu_die(ENOMEM);
    437 	urcu_workqueue_queue_completion(workqueue, completion);
    438 	urcu_workqueue_wait_completion(completion);
    439 	urcu_workqueue_destroy_completion(completion);
    440 }
    441 
    442 /* To be used in before fork handler. */
    443 void urcu_workqueue_pause_worker(struct urcu_workqueue *workqueue)
    444 {
    445 	uatomic_or(&workqueue->flags, URCU_WORKQUEUE_PAUSE);
    446 	cmm_smp_mb__after_uatomic_or();
    447 	wake_worker_thread(workqueue);
    448 
    449 	while ((uatomic_read(&workqueue->flags) & URCU_WORKQUEUE_PAUSED) == 0)
    450 		(void) poll(NULL, 0, 1);
    451 }
    452 
    453 /* To be used in after fork parent handler. */
    454 void urcu_workqueue_resume_worker(struct urcu_workqueue *workqueue)
    455 {
    456 	uatomic_and(&workqueue->flags, ~URCU_WORKQUEUE_PAUSE);
    457 	while ((uatomic_read(&workqueue->flags) & URCU_WORKQUEUE_PAUSED) != 0)
    458 		(void) poll(NULL, 0, 1);
    459 }
    460 
    461 void urcu_workqueue_create_worker(struct urcu_workqueue *workqueue)
    462 {
    463 	int ret;
    464 	sigset_t newmask, oldmask;
    465 
    466 	/* Clear workqueue state from parent. */
    467 	workqueue->flags &= ~URCU_WORKQUEUE_PAUSED;
    468 	workqueue->flags &= ~URCU_WORKQUEUE_PAUSE;
    469 	workqueue->tid = 0;
    470 
    471 	ret = sigfillset(&newmask);
    472 	urcu_posix_assert(!ret);
    473 	ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
    474 	urcu_posix_assert(!ret);
    475 
    476 	ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue);
    477 	if (ret) {
    478 		urcu_die(ret);
    479 	}
    480 
    481 	ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
    482 	urcu_posix_assert(!ret);
    483 }
    484