1 1.20 riastrad /* $NetBSD: subr_pcq.c,v 1.20 2023/02/24 11:02:27 riastradh Exp $ */ 2 1.3 rmind 3 1.1 matt /*- 4 1.11 ad * Copyright (c) 2009, 2019 The NetBSD Foundation, Inc. 5 1.1 matt * All rights reserved. 6 1.1 matt * 7 1.1 matt * This code is derived from software contributed to The NetBSD Foundation 8 1.4 rmind * by Andrew Doran. 9 1.1 matt * 10 1.1 matt * Redistribution and use in source and binary forms, with or without 11 1.1 matt * modification, are permitted provided that the following conditions 12 1.1 matt * are met: 13 1.1 matt * 1. Redistributions of source code must retain the above copyright 14 1.1 matt * notice, this list of conditions and the following disclaimer. 15 1.1 matt * 2. Redistributions in binary form must reproduce the above copyright 16 1.1 matt * notice, this list of conditions and the following disclaimer in the 17 1.1 matt * documentation and/or other materials provided with the distribution. 18 1.1 matt * 19 1.1 matt * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 1.1 matt * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 1.1 matt * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 1.1 matt * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 1.1 matt * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 1.1 matt * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 1.1 matt * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 1.1 matt * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 1.1 matt * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 1.1 matt * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 1.1 matt * POSSIBILITY OF SUCH DAMAGE. 30 1.1 matt */ 31 1.3 rmind 32 1.4 rmind /* 33 1.4 rmind * Lockless producer/consumer queue. 34 1.18 riastrad * 35 1.18 riastrad * Summary of the producer algorithm in pcq_put (may run many in 36 1.18 riastrad * parallel with each other and with a consumer): 37 1.18 riastrad * 38 1.18 riastrad * P1. initialize an item 39 1.18 riastrad * 40 1.18 riastrad * P2. atomic_cas(&pcq->pcq_pc) loop to advance the producer 41 1.18 riastrad * pointer, reserving a space at c (fails if not enough space) 42 1.18 riastrad * 43 1.18 riastrad * P3. atomic_store_release(&pcq->pcq_items[c], item) to publish 44 1.18 riastrad * the item in the space it reserved 45 1.18 riastrad * 46 1.18 riastrad * Summary of the consumer algorithm in pcq_get (must be serialized by 47 1.18 riastrad * caller with other consumers, may run in parallel with any number of 48 1.18 riastrad * producers): 49 1.18 riastrad * 50 1.18 riastrad * C1. atomic_load_relaxed(&pcq->pcq_pc) to get the consumer 51 1.18 riastrad * pointer and a snapshot of the producer pointer, which may 52 1.18 riastrad * point to null items or point to initialized items (fails if 53 1.18 riastrad * no space reserved for published items yet) 54 1.18 riastrad * 55 1.18 riastrad * C2. atomic_load_consume(&pcq->pcq_items[c]) to get the next 56 1.18 riastrad * unconsumed but potentially published item (fails if item 57 1.18 riastrad * not published yet) 58 1.18 riastrad * 59 1.18 riastrad * C3. pcq->pcq_items[c] = NULL to consume the next unconsumed but 60 1.18 riastrad * published item 61 1.18 riastrad * 62 1.18 riastrad * C4. membar_producer 63 1.18 riastrad * 64 1.18 riastrad * C5. atomic_cas(&pcq->pcq_pc) loop to advance the consumer 65 1.18 riastrad * pointer 66 1.18 riastrad * 67 1.18 riastrad * C6. use the item 68 1.18 riastrad * 69 1.18 riastrad * Note that there is a weird bare membar_producer which is not matched 70 1.18 riastrad * by membar_consumer. This is one of the rare cases of a memory 71 1.18 riastrad * barrier on one side that is not matched by a memory barrier on 72 1.18 riastrad * another side, but the ordering works out, with a somewhat more 73 1.18 riastrad * involved proof. 74 1.18 riastrad * 75 1.18 riastrad * Some properties that need to be proved: 76 1.18 riastrad * 77 1.18 riastrad * Theorem 1. For pcq_put call that leads into pcq_get: 78 1.18 riastrad * Initializing item at P1 is dependency-ordered before usage of 79 1.18 riastrad * item at C6, so items placed by pcq_put can be safely used by 80 1.18 riastrad * the caller of pcq_get. 81 1.18 riastrad * 82 1.18 riastrad * Proof sketch. 83 1.18 riastrad * 84 1.18 riastrad * Assume load/store P2 synchronizes with load/store C1 85 1.18 riastrad * (if not, pcq_get fails in `if (p == c) return NULL'). 86 1.18 riastrad * 87 1.18 riastrad * Assume store-release P3 synchronizes with load-consume 88 1.18 riastrad * C2 (if not, pcq_get fails in `if (item == NULL) return 89 1.18 riastrad * NULL'). 90 1.18 riastrad * 91 1.18 riastrad * Then: 92 1.18 riastrad * 93 1.18 riastrad * - P1 is sequenced before store-release P3 94 1.18 riastrad * - store-release P3 synchronizes with load-consume C2 95 1.18 riastrad * - load-consume C2 is dependency-ordered before C6 96 1.18 riastrad * 97 1.18 riastrad * Hence transitively, P1 is dependency-ordered before C6, 98 1.18 riastrad * QED. 99 1.18 riastrad * 100 1.18 riastrad * Theorem 2. For pcq_get call followed by pcq_put: Nulling out 101 1.18 riastrad * location at store C3 happens before placing a new item in the 102 1.18 riastrad * same location at store P3, so items are not lost. 103 1.18 riastrad * 104 1.18 riastrad * Proof sketch. 105 1.18 riastrad * 106 1.18 riastrad * Assume load/store C5 synchronizes with load/store P2 107 1.18 riastrad * (otherwise pcq_peek starts over the CAS loop or fails). 108 1.18 riastrad * 109 1.18 riastrad * Then: 110 1.18 riastrad * 111 1.18 riastrad * - store C3 is sequenced before membar_producer C4 112 1.18 riastrad * - membar_producer C4 is sequenced before load/store C5 113 1.18 riastrad * - load/store C5 synchronizes with load/store P2 at &pcq->pcq_pc 114 1.18 riastrad * - P2 is sequenced before store-release P3 115 1.18 riastrad * 116 1.18 riastrad * Hence transitively, store C3 happens before 117 1.18 riastrad * store-release P3, QED. 118 1.4 rmind */ 119 1.4 rmind 120 1.1 matt #include <sys/cdefs.h> 121 1.20 riastrad __KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.20 2023/02/24 11:02:27 riastradh Exp $"); 122 1.1 matt 123 1.1 matt #include <sys/param.h> 124 1.1 matt #include <sys/types.h> 125 1.1 matt #include <sys/atomic.h> 126 1.1 matt #include <sys/kmem.h> 127 1.1 matt 128 1.1 matt #include <sys/pcq.h> 129 1.1 matt 130 1.4 rmind /* 131 1.4 rmind * Internal producer-consumer queue structure. Note: providing a separate 132 1.4 rmind * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items. 133 1.4 rmind */ 134 1.1 matt struct pcq { 135 1.4 rmind u_int pcq_nitems; 136 1.4 rmind uint8_t pcq_pad1[COHERENCY_UNIT - sizeof(u_int)]; 137 1.4 rmind volatile uint32_t pcq_pc; 138 1.4 rmind uint8_t pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)]; 139 1.4 rmind void * volatile pcq_items[]; 140 1.1 matt }; 141 1.1 matt 142 1.4 rmind /* 143 1.4 rmind * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc. 144 1.4 rmind * Consumer (c) - in the higher 16 bits. 145 1.4 rmind * 146 1.4 rmind * We have a limitation of 16 bits i.e. 0xffff items in the queue. 147 1.8 rmind * The PCQ_MAXLEN constant is set accordingly. 148 1.4 rmind */ 149 1.4 rmind 150 1.4 rmind static inline void 151 1.4 rmind pcq_split(uint32_t v, u_int *p, u_int *c) 152 1.1 matt { 153 1.3 rmind 154 1.4 rmind *p = v & 0xffff; 155 1.4 rmind *c = v >> 16; 156 1.4 rmind } 157 1.1 matt 158 1.4 rmind static inline uint32_t 159 1.4 rmind pcq_combine(u_int p, u_int c) 160 1.4 rmind { 161 1.4 rmind 162 1.4 rmind return p | (c << 16); 163 1.4 rmind } 164 1.4 rmind 165 1.4 rmind static inline u_int 166 1.4 rmind pcq_advance(pcq_t *pcq, u_int pc) 167 1.4 rmind { 168 1.4 rmind 169 1.4 rmind if (__predict_false(++pc == pcq->pcq_nitems)) { 170 1.4 rmind return 0; 171 1.4 rmind } 172 1.4 rmind return pc; 173 1.1 matt } 174 1.1 matt 175 1.4 rmind /* 176 1.4 rmind * pcq_put: place an item at the end of the queue. 177 1.4 rmind */ 178 1.1 matt bool 179 1.1 matt pcq_put(pcq_t *pcq, void *item) 180 1.1 matt { 181 1.4 rmind uint32_t v, nv; 182 1.4 rmind u_int op, p, c; 183 1.1 matt 184 1.1 matt KASSERT(item != NULL); 185 1.1 matt 186 1.4 rmind do { 187 1.15 riastrad v = atomic_load_relaxed(&pcq->pcq_pc); 188 1.4 rmind pcq_split(v, &op, &c); 189 1.4 rmind p = pcq_advance(pcq, op); 190 1.4 rmind if (p == c) { 191 1.4 rmind /* Queue is full. */ 192 1.4 rmind return false; 193 1.4 rmind } 194 1.4 rmind nv = pcq_combine(p, c); 195 1.4 rmind } while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v); 196 1.4 rmind 197 1.1 matt /* 198 1.4 rmind * Ensure that the update to pcq_pc is globally visible before the 199 1.4 rmind * data item. See pcq_get(). This also ensures that any changes 200 1.4 rmind * that the caller made to the data item are globally visible 201 1.4 rmind * before we put it onto the list. 202 1.1 matt */ 203 1.14 riastrad atomic_store_release(&pcq->pcq_items[op], item); 204 1.4 rmind 205 1.4 rmind /* 206 1.4 rmind * Synchronization activity to wake up the consumer will ensure 207 1.4 rmind * that the update to pcq_items[] is visible before the wakeup 208 1.13 wiz * arrives. So, we do not need an additional memory barrier here. 209 1.4 rmind */ 210 1.4 rmind return true; 211 1.4 rmind } 212 1.1 matt 213 1.4 rmind /* 214 1.4 rmind * pcq_peek: return the next item from the queue without removal. 215 1.4 rmind */ 216 1.4 rmind void * 217 1.4 rmind pcq_peek(pcq_t *pcq) 218 1.4 rmind { 219 1.15 riastrad const uint32_t v = atomic_load_relaxed(&pcq->pcq_pc); 220 1.4 rmind u_int p, c; 221 1.1 matt 222 1.4 rmind pcq_split(v, &p, &c); 223 1.1 matt 224 1.4 rmind /* See comment on race below in pcq_get(). */ 225 1.15 riastrad return (p == c) ? NULL : atomic_load_consume(&pcq->pcq_items[c]); 226 1.1 matt } 227 1.1 matt 228 1.1 matt /* 229 1.4 rmind * pcq_get: remove and return the next item for consumption or NULL if empty. 230 1.4 rmind * 231 1.10 dholland * => The caller must prevent concurrent gets from occurring. 232 1.1 matt */ 233 1.1 matt void * 234 1.1 matt pcq_get(pcq_t *pcq) 235 1.1 matt { 236 1.4 rmind uint32_t v, nv; 237 1.4 rmind u_int p, c; 238 1.1 matt void *item; 239 1.1 matt 240 1.15 riastrad v = atomic_load_relaxed(&pcq->pcq_pc); 241 1.4 rmind pcq_split(v, &p, &c); 242 1.4 rmind if (p == c) { 243 1.4 rmind /* Queue is empty: nothing to return. */ 244 1.4 rmind return NULL; 245 1.4 rmind } 246 1.15 riastrad item = atomic_load_consume(&pcq->pcq_items[c]); 247 1.4 rmind if (item == NULL) { 248 1.4 rmind /* 249 1.4 rmind * Raced with sender: we rely on a notification (e.g. softint 250 1.4 rmind * or wakeup) being generated after the producer's pcq_put(), 251 1.4 rmind * causing us to retry pcq_get() later. 252 1.4 rmind */ 253 1.1 matt return NULL; 254 1.4 rmind } 255 1.16 riastrad /* 256 1.16 riastrad * We have exclusive access to this slot, so no need for 257 1.16 riastrad * atomic_store_*. 258 1.16 riastrad */ 259 1.4 rmind pcq->pcq_items[c] = NULL; 260 1.4 rmind c = pcq_advance(pcq, c); 261 1.4 rmind nv = pcq_combine(p, c); 262 1.1 matt 263 1.1 matt /* 264 1.17 riastrad * Ensure that update to pcq_items[c] becomes globally visible 265 1.12 skrll * before the update to pcq_pc. If it were reordered to occur 266 1.4 rmind * after it, we could in theory wipe out a modification made 267 1.17 riastrad * to pcq_items[c] by pcq_put(). 268 1.17 riastrad * 269 1.17 riastrad * No need for load-before-store ordering of membar_release 270 1.17 riastrad * because the only load we need to ensure happens first is the 271 1.17 riastrad * load of pcq->pcq_items[c], but that necessarily happens 272 1.17 riastrad * before the store to pcq->pcq_items[c] to null it out because 273 1.18 riastrad * it is at the same memory location. Yes, this is a bare 274 1.18 riastrad * membar_producer with no matching membar_consumer. 275 1.1 matt */ 276 1.1 matt membar_producer(); 277 1.4 rmind while (__predict_false(atomic_cas_32(&pcq->pcq_pc, v, nv) != v)) { 278 1.15 riastrad v = atomic_load_relaxed(&pcq->pcq_pc); 279 1.4 rmind pcq_split(v, &p, &c); 280 1.4 rmind c = pcq_advance(pcq, c); 281 1.4 rmind nv = pcq_combine(p, c); 282 1.4 rmind } 283 1.1 matt return item; 284 1.1 matt } 285 1.1 matt 286 1.1 matt pcq_t * 287 1.4 rmind pcq_create(size_t nitems, km_flag_t kmflags) 288 1.1 matt { 289 1.1 matt pcq_t *pcq; 290 1.1 matt 291 1.19 riastrad KASSERT(nitems > 0); 292 1.19 riastrad KASSERT(nitems <= PCQ_MAXLEN); 293 1.1 matt 294 1.6 alnsn pcq = kmem_zalloc(offsetof(pcq_t, pcq_items[nitems]), kmflags); 295 1.11 ad if (pcq != NULL) { 296 1.11 ad pcq->pcq_nitems = nitems; 297 1.4 rmind } 298 1.1 matt return pcq; 299 1.1 matt } 300 1.1 matt 301 1.1 matt void 302 1.1 matt pcq_destroy(pcq_t *pcq) 303 1.1 matt { 304 1.3 rmind 305 1.4 rmind kmem_free(pcq, offsetof(pcq_t, pcq_items[pcq->pcq_nitems])); 306 1.4 rmind } 307 1.4 rmind 308 1.4 rmind size_t 309 1.4 rmind pcq_maxitems(pcq_t *pcq) 310 1.4 rmind { 311 1.1 matt 312 1.4 rmind return pcq->pcq_nitems; 313 1.1 matt } 314