Home | History | Annotate | Line # | Download | only in internal
      1 #ifndef JEMALLOC_INTERNAL_MPSC_QUEUE_H
      2 #define JEMALLOC_INTERNAL_MPSC_QUEUE_H
      3 
      4 #include "jemalloc/internal/atomic.h"
      5 
      6 /*
      7  * A concurrent implementation of a multi-producer, single-consumer queue.  It
      8  * supports three concurrent operations:
      9  * - Push
     10  * - Push batch
     11  * - Pop batch
     12  *
     13  * These operations are all lock-free.
     14  *
     15  * The implementation is the simple two-stack queue built on a Treiber stack.
     16  * It's not terribly efficient, but this isn't expected to go into anywhere with
     17  * hot code.  In fact, we don't really even need queue semantics in any
     18  * anticipated use cases; we could get away with just the stack.  But this way
     19  * lets us frame the API in terms of the existing list types, which is a nice
     20  * convenience.  We can save on cache misses by introducing our own (parallel)
     21  * single-linked list type here, and dropping FIFO semantics, if we need this to
     22  * get faster.  Since we're currently providing queue semantics though, we use
     23  * the prev field in the link rather than the next field for Treiber-stack
     24  * linkage, so that we can preserve order for bash-pushed lists (recall that the
     25  * two-stack tricks reverses orders in the lock-free first stack).
     26  */
     27 
     28 #define mpsc_queue(a_type)						\
     29 struct {								\
     30 	atomic_p_t tail;						\
     31 }
     32 
     33 #define mpsc_queue_proto(a_attr, a_prefix, a_queue_type, a_type,	\
     34     a_list_type)							\
     35 /* Initialize a queue. */						\
     36 a_attr void								\
     37 a_prefix##new(a_queue_type *queue);					\
     38 /* Insert all items in src into the queue, clearing src. */		\
     39 a_attr void								\
     40 a_prefix##push_batch(a_queue_type *queue, a_list_type *src);		\
     41 /* Insert node into the queue. */					\
     42 a_attr void								\
     43 a_prefix##push(a_queue_type *queue, a_type *node);			\
     44 /*									\
     45  * Pop all items in the queue into the list at dst.  dst should already	\
     46  * be initialized (and may contain existing items, which then remain	\
     47  * in dst).								\
     48  */									\
     49 a_attr void								\
     50 a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst);
     51 
     52 #define mpsc_queue_gen(a_attr, a_prefix, a_queue_type, a_type,		\
     53     a_list_type, a_link)						\
     54 a_attr void								\
     55 a_prefix##new(a_queue_type *queue) {					\
     56 	atomic_store_p(&queue->tail, NULL, ATOMIC_RELAXED);		\
     57 }									\
     58 a_attr void								\
     59 a_prefix##push_batch(a_queue_type *queue, a_list_type *src) {		\
     60 	/*								\
     61 	 * Reuse the ql list next field as the Treiber stack next	\
     62 	 * field.							\
     63 	 */								\
     64 	a_type *first = ql_first(src);					\
     65 	a_type *last = ql_last(src, a_link);				\
     66 	void* cur_tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED);	\
     67 	do {								\
     68 		/*							\
     69 		 * Note that this breaks the queue ring structure;	\
     70 		 * it's not a ring any more!				\
     71 		 */							\
     72 		first->a_link.qre_prev = cur_tail;			\
     73 		/*							\
     74 		 * Note: the upcoming CAS doesn't need an atomic; every	\
     75 		 * push only needs to synchronize with the next pop,	\
     76 		 * which we get from the release sequence rules.	\
     77 		 */							\
     78 	} while (!atomic_compare_exchange_weak_p(&queue->tail,		\
     79 	    &cur_tail, last, ATOMIC_RELEASE, ATOMIC_RELAXED));		\
     80 	ql_new(src);							\
     81 }									\
     82 a_attr void								\
     83 a_prefix##push(a_queue_type *queue, a_type *node) {			\
     84 	ql_elm_new(node, a_link);					\
     85 	a_list_type list;						\
     86 	ql_new(&list);							\
     87 	ql_head_insert(&list, node, a_link);				\
     88 	a_prefix##push_batch(queue, &list);				\
     89 }									\
     90 a_attr void								\
     91 a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst) {		\
     92 	a_type *tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED);	\
     93 	if (tail == NULL) {						\
     94 		/*							\
     95 		 * In the common special case where there are no	\
     96 		 * pending elements, bail early without a costly RMW.	\
     97 		 */							\
     98 		return;							\
     99 	}								\
    100 	tail = atomic_exchange_p(&queue->tail, NULL, ATOMIC_ACQUIRE);	\
    101 	/*								\
    102 	 * It's a single-consumer queue, so if cur started non-NULL,	\
    103 	 * it'd better stay non-NULL.					\
    104 	 */								\
    105 	assert(tail != NULL);						\
    106 	/*								\
    107 	 * We iterate through the stack and both fix up the link	\
    108 	 * structure (stack insertion broke the list requirement that	\
    109 	 * the list be circularly linked).  It's just as efficient at	\
    110 	 * this point to make the queue a "real" queue, so do that as	\
    111 	 * well.							\
    112 	 * If this ever gets to be a hot spot, we can omit this fixup	\
    113 	 * and make the queue a bag (i.e. not necessarily ordered), but	\
    114 	 * that would mean jettisoning the existing list API as the 	\
    115 	 * batch pushing/popping interface.				\
    116 	 */								\
    117 	a_list_type reversed;						\
    118 	ql_new(&reversed);						\
    119 	while (tail != NULL) {						\
    120 		/*							\
    121 		 * Pop an item off the stack, prepend it onto the list	\
    122 		 * (reversing the order).  Recall that we use the	\
    123 		 * list prev field as the Treiber stack next field to	\
    124 		 * preserve order of batch-pushed items when reversed.	\
    125 		 */							\
    126 		a_type *next = tail->a_link.qre_prev;			\
    127 		ql_elm_new(tail, a_link);				\
    128 		ql_head_insert(&reversed, tail, a_link);		\
    129 		tail = next;						\
    130 	}								\
    131 	ql_concat(dst, &reversed, a_link);				\
    132 }
    133 
    134 #endif /* JEMALLOC_INTERNAL_MPSC_QUEUE_H */
    135