Home | History | Annotate | Line # | Download | only in kern
subr_pcq.c revision 1.3.28.1
      1  1.3.28.1    mrg /*	$NetBSD: subr_pcq.c,v 1.3.28.1 2012/02/18 07:35:32 mrg Exp $	*/
      2       1.3  rmind 
      3       1.1   matt /*-
      4  1.3.28.1    mrg  * Copyright (c) 2009 The NetBSD Foundation, Inc.
      5       1.1   matt  * All rights reserved.
      6       1.1   matt  *
      7       1.1   matt  * This code is derived from software contributed to The NetBSD Foundation
      8  1.3.28.1    mrg  * by Andrew Doran.
      9       1.1   matt  *
     10       1.1   matt  * Redistribution and use in source and binary forms, with or without
     11       1.1   matt  * modification, are permitted provided that the following conditions
     12       1.1   matt  * are met:
     13       1.1   matt  * 1. Redistributions of source code must retain the above copyright
     14       1.1   matt  *    notice, this list of conditions and the following disclaimer.
     15       1.1   matt  * 2. Redistributions in binary form must reproduce the above copyright
     16       1.1   matt  *    notice, this list of conditions and the following disclaimer in the
     17       1.1   matt  *    documentation and/or other materials provided with the distribution.
     18       1.1   matt  *
     19       1.1   matt  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
     20       1.1   matt  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     21       1.1   matt  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     22       1.1   matt  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
     23       1.1   matt  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24       1.1   matt  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25       1.1   matt  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26       1.1   matt  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27       1.1   matt  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28       1.1   matt  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29       1.1   matt  * POSSIBILITY OF SUCH DAMAGE.
     30       1.1   matt  */
     31       1.3  rmind 
     32  1.3.28.1    mrg /*
     33  1.3.28.1    mrg  * Lockless producer/consumer queue.
     34  1.3.28.1    mrg  */
     35  1.3.28.1    mrg 
     36       1.1   matt #include <sys/cdefs.h>
     37  1.3.28.1    mrg __KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.3.28.1 2012/02/18 07:35:32 mrg Exp $");
     38       1.1   matt 
     39       1.1   matt #include <sys/param.h>
     40       1.1   matt #include <sys/types.h>
     41       1.1   matt #include <sys/atomic.h>
     42       1.1   matt #include <sys/kmem.h>
     43       1.1   matt 
     44       1.1   matt #include <sys/pcq.h>
     45       1.1   matt 
     46  1.3.28.1    mrg /*
     47  1.3.28.1    mrg  * Internal producer-consumer queue structure.  Note: providing a separate
     48  1.3.28.1    mrg  * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items.
     49  1.3.28.1    mrg  */
     50       1.1   matt struct pcq {
     51  1.3.28.1    mrg 	u_int			pcq_nitems;
     52  1.3.28.1    mrg 	uint8_t			pcq_pad1[COHERENCY_UNIT - sizeof(u_int)];
     53  1.3.28.1    mrg 	volatile uint32_t	pcq_pc;
     54  1.3.28.1    mrg 	uint8_t			pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)];
     55  1.3.28.1    mrg 	void * volatile		pcq_items[];
     56       1.1   matt };
     57       1.1   matt 
     58  1.3.28.1    mrg /*
     59  1.3.28.1    mrg  * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc.
     60  1.3.28.1    mrg  * Consumer (c) - in the higher 16 bits.
     61  1.3.28.1    mrg  *
     62  1.3.28.1    mrg  * We have a limitation of 16 bits i.e. 0xffff items in the queue.
     63  1.3.28.1    mrg  */
     64       1.3  rmind 
     65  1.3.28.1    mrg static inline void
     66  1.3.28.1    mrg pcq_split(uint32_t v, u_int *p, u_int *c)
     67  1.3.28.1    mrg {
     68       1.1   matt 
     69  1.3.28.1    mrg 	*p = v & 0xffff;
     70  1.3.28.1    mrg 	*c = v >> 16;
     71       1.1   matt }
     72       1.1   matt 
     73  1.3.28.1    mrg static inline uint32_t
     74  1.3.28.1    mrg pcq_combine(u_int p, u_int c)
     75       1.1   matt {
     76       1.1   matt 
     77  1.3.28.1    mrg 	return p | (c << 16);
     78  1.3.28.1    mrg }
     79       1.1   matt 
     80  1.3.28.1    mrg static inline u_int
     81  1.3.28.1    mrg pcq_advance(pcq_t *pcq, u_int pc)
     82  1.3.28.1    mrg {
     83       1.1   matt 
     84  1.3.28.1    mrg 	if (__predict_false(++pc == pcq->pcq_nitems)) {
     85  1.3.28.1    mrg 		return 0;
     86       1.1   matt 	}
     87  1.3.28.1    mrg 	return pc;
     88       1.1   matt }
     89       1.1   matt 
     90       1.1   matt /*
     91  1.3.28.1    mrg  * pcq_put: place an item at the end of the queue.
     92       1.1   matt  */
     93  1.3.28.1    mrg bool
     94  1.3.28.1    mrg pcq_put(pcq_t *pcq, void *item)
     95       1.1   matt {
     96  1.3.28.1    mrg 	uint32_t v, nv;
     97  1.3.28.1    mrg 	u_int op, p, c;
     98       1.1   matt 
     99  1.3.28.1    mrg 	KASSERT(item != NULL);
    100       1.1   matt 
    101  1.3.28.1    mrg 	do {
    102  1.3.28.1    mrg 		v = pcq->pcq_pc;
    103  1.3.28.1    mrg 		pcq_split(v, &op, &c);
    104  1.3.28.1    mrg 		p = pcq_advance(pcq, op);
    105  1.3.28.1    mrg 		if (p == c) {
    106  1.3.28.1    mrg 			/* Queue is full. */
    107  1.3.28.1    mrg 			return false;
    108  1.3.28.1    mrg 		}
    109  1.3.28.1    mrg 		nv = pcq_combine(p, c);
    110  1.3.28.1    mrg 	} while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v);
    111       1.1   matt 
    112       1.1   matt 	/*
    113  1.3.28.1    mrg 	 * Ensure that the update to pcq_pc is globally visible before the
    114  1.3.28.1    mrg 	 * data item.  See pcq_get().  This also ensures that any changes
    115  1.3.28.1    mrg 	 * that the caller made to the data item are globally visible
    116  1.3.28.1    mrg 	 * before we put it onto the list.
    117       1.1   matt 	 */
    118  1.3.28.1    mrg #ifndef _HAVE_ATOMIC_AS_MEMBAR
    119       1.1   matt 	membar_producer();
    120  1.3.28.1    mrg #endif
    121  1.3.28.1    mrg 	pcq->pcq_items[op] = item;
    122       1.1   matt 
    123  1.3.28.1    mrg 	/*
    124  1.3.28.1    mrg 	 * Synchronization activity to wake up the consumer will ensure
    125  1.3.28.1    mrg 	 * that the update to pcq_items[] is visible before the wakeup
    126  1.3.28.1    mrg 	 * arrives.  So, we do not need an additonal memory barrier here.
    127  1.3.28.1    mrg 	 */
    128  1.3.28.1    mrg 	return true;
    129       1.1   matt }
    130       1.1   matt 
    131  1.3.28.1    mrg /*
    132  1.3.28.1    mrg  * pcq_peek: return the next item from the queue without removal.
    133  1.3.28.1    mrg  */
    134       1.1   matt void *
    135       1.1   matt pcq_peek(pcq_t *pcq)
    136       1.1   matt {
    137  1.3.28.1    mrg 	const uint32_t v = pcq->pcq_pc;
    138  1.3.28.1    mrg 	u_int p, c;
    139       1.3  rmind 
    140  1.3.28.1    mrg 	pcq_split(v, &p, &c);
    141  1.3.28.1    mrg 
    142  1.3.28.1    mrg 	/* See comment on race below in pcq_get(). */
    143  1.3.28.1    mrg 	return (p == c) ? NULL : pcq->pcq_items[c];
    144       1.1   matt }
    145       1.1   matt 
    146  1.3.28.1    mrg /*
    147  1.3.28.1    mrg  * pcq_get: remove and return the next item for consumption or NULL if empty.
    148  1.3.28.1    mrg  *
    149  1.3.28.1    mrg  * => The caller must prevent concurrent gets from occuring.
    150  1.3.28.1    mrg  */
    151  1.3.28.1    mrg void *
    152  1.3.28.1    mrg pcq_get(pcq_t *pcq)
    153       1.1   matt {
    154  1.3.28.1    mrg 	uint32_t v, nv;
    155  1.3.28.1    mrg 	u_int p, c;
    156  1.3.28.1    mrg 	void *item;
    157       1.3  rmind 
    158  1.3.28.1    mrg 	v = pcq->pcq_pc;
    159  1.3.28.1    mrg 	pcq_split(v, &p, &c);
    160  1.3.28.1    mrg 	if (p == c) {
    161  1.3.28.1    mrg 		/* Queue is empty: nothing to return. */
    162  1.3.28.1    mrg 		return NULL;
    163  1.3.28.1    mrg 	}
    164  1.3.28.1    mrg 	item = pcq->pcq_items[c];
    165  1.3.28.1    mrg 	if (item == NULL) {
    166  1.3.28.1    mrg 		/*
    167  1.3.28.1    mrg 		 * Raced with sender: we rely on a notification (e.g. softint
    168  1.3.28.1    mrg 		 * or wakeup) being generated after the producer's pcq_put(),
    169  1.3.28.1    mrg 		 * causing us to retry pcq_get() later.
    170  1.3.28.1    mrg 		 */
    171  1.3.28.1    mrg 		return NULL;
    172  1.3.28.1    mrg 	}
    173  1.3.28.1    mrg 	pcq->pcq_items[c] = NULL;
    174  1.3.28.1    mrg 	c = pcq_advance(pcq, c);
    175  1.3.28.1    mrg 	nv = pcq_combine(p, c);
    176  1.3.28.1    mrg 
    177  1.3.28.1    mrg 	/*
    178  1.3.28.1    mrg 	 * Ensure that update to pcq_items[] becomes globally visible
    179  1.3.28.1    mrg 	 * before the update to pcq_pc.  If it were reodered to occur
    180  1.3.28.1    mrg 	 * after it, we could in theory wipe out a modification made
    181  1.3.28.1    mrg 	 * to pcq_items[] by pcq_put().
    182  1.3.28.1    mrg 	 */
    183  1.3.28.1    mrg #ifndef _HAVE_ATOMIC_AS_MEMBAR
    184  1.3.28.1    mrg 	membar_producer();
    185  1.3.28.1    mrg #endif
    186  1.3.28.1    mrg 	while (__predict_false(atomic_cas_32(&pcq->pcq_pc, v, nv) != v)) {
    187  1.3.28.1    mrg 		v = pcq->pcq_pc;
    188  1.3.28.1    mrg 		pcq_split(v, &p, &c);
    189  1.3.28.1    mrg 		c = pcq_advance(pcq, c);
    190  1.3.28.1    mrg 		nv = pcq_combine(p, c);
    191  1.3.28.1    mrg 	}
    192  1.3.28.1    mrg 	return item;
    193       1.1   matt }
    194       1.1   matt 
    195       1.1   matt pcq_t *
    196  1.3.28.1    mrg pcq_create(size_t nitems, km_flag_t kmflags)
    197       1.1   matt {
    198       1.1   matt 	pcq_t *pcq;
    199       1.1   matt 
    200  1.3.28.1    mrg 	KASSERT(nitems > 0 || nitems <= 0xffff);
    201       1.1   matt 
    202  1.3.28.1    mrg 	pcq = kmem_zalloc(offsetof(pcq_t, pcq_items[nitems]), kmflags);
    203  1.3.28.1    mrg 	if (pcq == NULL) {
    204       1.1   matt 		return NULL;
    205  1.3.28.1    mrg 	}
    206  1.3.28.1    mrg 	pcq->pcq_nitems = nitems;
    207       1.1   matt 	return pcq;
    208       1.1   matt }
    209       1.1   matt 
    210       1.1   matt void
    211       1.1   matt pcq_destroy(pcq_t *pcq)
    212       1.1   matt {
    213       1.3  rmind 
    214  1.3.28.1    mrg 	kmem_free(pcq, offsetof(pcq_t, pcq_items[pcq->pcq_nitems]));
    215  1.3.28.1    mrg }
    216  1.3.28.1    mrg 
    217  1.3.28.1    mrg size_t
    218  1.3.28.1    mrg pcq_maxitems(pcq_t *pcq)
    219  1.3.28.1    mrg {
    220       1.1   matt 
    221  1.3.28.1    mrg 	return pcq->pcq_nitems;
    222       1.1   matt }
    223