pktqueue.c revision 1.11 1 /* $NetBSD: pktqueue.c,v 1.11 2020/02/07 12:35:33 thorpej Exp $ */
2
3 /*-
4 * Copyright (c) 2014 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Mindaugas Rasiukevicius.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 /*
33 * The packet queue (pktqueue) interface is a lockless IP input queue
34 * which also abstracts and handles network ISR scheduling. It provides
35 * a mechanism to enable receiver-side packet steering (RPS).
36 */
37
38 #include <sys/cdefs.h>
39 __KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.11 2020/02/07 12:35:33 thorpej Exp $");
40
41 #include <sys/param.h>
42 #include <sys/types.h>
43
44 #include <sys/atomic.h>
45 #include <sys/cpu.h>
46 #include <sys/pcq.h>
47 #include <sys/intr.h>
48 #include <sys/mbuf.h>
49 #include <sys/proc.h>
50 #include <sys/percpu.h>
51 #include <sys/xcall.h>
52
53 #include <net/pktqueue.h>
54
55 /*
56 * WARNING: update this if struct pktqueue changes.
57 */
58 #define PKTQ_CLPAD \
59 MAX(COHERENCY_UNIT, COHERENCY_UNIT - sizeof(kmutex_t) - sizeof(u_int))
60
61 struct pktqueue {
62 /*
63 * The lock used for a barrier mechanism. The barrier counter,
64 * as well as the drop counter, are managed atomically though.
65 * Ensure this group is in a separate cache line.
66 */
67 kmutex_t pq_lock;
68 volatile u_int pq_barrier;
69 uint8_t _pad[PKTQ_CLPAD];
70
71 /* The size of the queue, counters and the interrupt handler. */
72 u_int pq_maxlen;
73 percpu_t * pq_counters;
74 void * pq_sih;
75
76 /* Finally, per-CPU queues. */
77 pcq_t * pq_queue[];
78 };
79
80 /* The counters of the packet queue. */
81 #define PQCNT_ENQUEUE 0
82 #define PQCNT_DEQUEUE 1
83 #define PQCNT_DROP 2
84 #define PQCNT_NCOUNTERS 3
85
86 typedef struct {
87 uint64_t count[PQCNT_NCOUNTERS];
88 } pktq_counters_t;
89
90 /* Special marker value used by pktq_barrier() mechanism. */
91 #define PKTQ_MARKER ((void *)(~0ULL))
92
93 /*
94 * The total size of pktqueue_t which depends on the number of CPUs.
95 */
96 #define PKTQUEUE_STRUCT_LEN(ncpu) \
97 roundup2(offsetof(pktqueue_t, pq_queue[ncpu]), coherency_unit)
98
99 pktqueue_t *
100 pktq_create(size_t maxlen, void (*intrh)(void *), void *sc)
101 {
102 const u_int sflags = SOFTINT_NET | SOFTINT_MPSAFE | SOFTINT_RCPU;
103 const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
104 pktqueue_t *pq;
105 percpu_t *pc;
106 void *sih;
107
108 pc = percpu_alloc(sizeof(pktq_counters_t));
109 if ((sih = softint_establish(sflags, intrh, sc)) == NULL) {
110 percpu_free(pc, sizeof(pktq_counters_t));
111 return NULL;
112 }
113
114 pq = kmem_zalloc(len, KM_SLEEP);
115 for (u_int i = 0; i < ncpu; i++) {
116 pq->pq_queue[i] = pcq_create(maxlen, KM_SLEEP);
117 }
118 mutex_init(&pq->pq_lock, MUTEX_DEFAULT, IPL_NONE);
119 pq->pq_maxlen = maxlen;
120 pq->pq_counters = pc;
121 pq->pq_sih = sih;
122
123 return pq;
124 }
125
126 void
127 pktq_destroy(pktqueue_t *pq)
128 {
129 const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
130
131 for (u_int i = 0; i < ncpu; i++) {
132 pcq_t *q = pq->pq_queue[i];
133 KASSERT(pcq_peek(q) == NULL);
134 pcq_destroy(q);
135 }
136 percpu_free(pq->pq_counters, sizeof(pktq_counters_t));
137 softint_disestablish(pq->pq_sih);
138 mutex_destroy(&pq->pq_lock);
139 kmem_free(pq, len);
140 }
141
142 /*
143 * - pktq_inc_counter: increment the counter given an ID.
144 * - pktq_collect_counts: handler to sum up the counts from each CPU.
145 * - pktq_getcount: return the effective count given an ID.
146 */
147
148 static inline void
149 pktq_inc_count(pktqueue_t *pq, u_int i)
150 {
151 percpu_t *pc = pq->pq_counters;
152 pktq_counters_t *c;
153
154 c = percpu_getref(pc);
155 c->count[i]++;
156 percpu_putref(pc);
157 }
158
159 static void
160 pktq_collect_counts(void *mem, void *arg, struct cpu_info *ci)
161 {
162 const pktq_counters_t *c = mem;
163 pktq_counters_t *sum = arg;
164
165 int s = splnet();
166
167 for (u_int i = 0; i < PQCNT_NCOUNTERS; i++) {
168 sum->count[i] += c->count[i];
169 }
170
171 splx(s);
172 }
173
174 uint64_t
175 pktq_get_count(pktqueue_t *pq, pktq_count_t c)
176 {
177 pktq_counters_t sum;
178
179 if (c != PKTQ_MAXLEN) {
180 memset(&sum, 0, sizeof(sum));
181 percpu_foreach_xcall(pq->pq_counters,
182 XC_HIGHPRI_IPL(IPL_SOFTNET), pktq_collect_counts, &sum);
183 }
184 switch (c) {
185 case PKTQ_NITEMS:
186 return sum.count[PQCNT_ENQUEUE] - sum.count[PQCNT_DEQUEUE];
187 case PKTQ_DROPS:
188 return sum.count[PQCNT_DROP];
189 case PKTQ_MAXLEN:
190 return pq->pq_maxlen;
191 }
192 return 0;
193 }
194
195 uint32_t
196 pktq_rps_hash(const struct mbuf *m __unused)
197 {
198 /*
199 * XXX: No distribution yet; the softnet_lock contention
200 * XXX: must be eliminated first.
201 */
202 return 0;
203 }
204
205 /*
206 * pktq_enqueue: inject the packet into the end of the queue.
207 *
208 * => Must be called from the interrupt or with the preemption disabled.
209 * => Consumes the packet and returns true on success.
210 * => Returns false on failure; caller is responsible to free the packet.
211 */
212 bool
213 pktq_enqueue(pktqueue_t *pq, struct mbuf *m, const u_int hash __unused)
214 {
215 #if defined(_RUMPKERNEL) || defined(_RUMP_NATIVE_ABI)
216 const unsigned cpuid = curcpu()->ci_index;
217 #else
218 const unsigned cpuid = hash % ncpu;
219 #endif
220
221 KASSERT(kpreempt_disabled());
222
223 if (__predict_false(!pcq_put(pq->pq_queue[cpuid], m))) {
224 pktq_inc_count(pq, PQCNT_DROP);
225 return false;
226 }
227 softint_schedule_cpu(pq->pq_sih, cpu_lookup(cpuid));
228 pktq_inc_count(pq, PQCNT_ENQUEUE);
229 return true;
230 }
231
232 /*
233 * pktq_dequeue: take a packet from the queue.
234 *
235 * => Must be called with preemption disabled.
236 * => Must ensure there are not concurrent dequeue calls.
237 */
238 struct mbuf *
239 pktq_dequeue(pktqueue_t *pq)
240 {
241 const struct cpu_info *ci = curcpu();
242 const unsigned cpuid = cpu_index(ci);
243 struct mbuf *m;
244
245 m = pcq_get(pq->pq_queue[cpuid]);
246 if (__predict_false(m == PKTQ_MARKER)) {
247 /* Note the marker entry. */
248 atomic_inc_uint(&pq->pq_barrier);
249 return NULL;
250 }
251 if (__predict_true(m != NULL)) {
252 pktq_inc_count(pq, PQCNT_DEQUEUE);
253 }
254 return m;
255 }
256
257 /*
258 * pktq_barrier: waits for a grace period when all packets enqueued at
259 * the moment of calling this routine will be processed. This is used
260 * to ensure that e.g. packets referencing some interface were drained.
261 */
262 void
263 pktq_barrier(pktqueue_t *pq)
264 {
265 u_int pending = 0;
266
267 mutex_enter(&pq->pq_lock);
268 KASSERT(pq->pq_barrier == 0);
269
270 for (u_int i = 0; i < ncpu; i++) {
271 pcq_t *q = pq->pq_queue[i];
272
273 /* If the queue is empty - nothing to do. */
274 if (pcq_peek(q) == NULL) {
275 continue;
276 }
277 /* Otherwise, put the marker and entry. */
278 while (!pcq_put(q, PKTQ_MARKER)) {
279 kpause("pktqsync", false, 1, NULL);
280 }
281 kpreempt_disable();
282 softint_schedule_cpu(pq->pq_sih, cpu_lookup(i));
283 kpreempt_enable();
284 pending++;
285 }
286
287 /* Wait for each queue to process the markers. */
288 while (pq->pq_barrier != pending) {
289 kpause("pktqsync", false, 1, NULL);
290 }
291 pq->pq_barrier = 0;
292 mutex_exit(&pq->pq_lock);
293 }
294
295 /*
296 * pktq_flush: free mbufs in all queues.
297 *
298 * => The caller must ensure there are no concurrent writers or flush calls.
299 */
300 void
301 pktq_flush(pktqueue_t *pq)
302 {
303 struct mbuf *m;
304
305 for (u_int i = 0; i < ncpu; i++) {
306 while ((m = pcq_get(pq->pq_queue[i])) != NULL) {
307 pktq_inc_count(pq, PQCNT_DEQUEUE);
308 m_freem(m);
309 }
310 }
311 }
312
313 /*
314 * pktq_set_maxlen: create per-CPU queues using a new size and replace
315 * the existing queues without losing any packets.
316 */
317 int
318 pktq_set_maxlen(pktqueue_t *pq, size_t maxlen)
319 {
320 const u_int slotbytes = ncpu * sizeof(pcq_t *);
321 pcq_t **qs;
322
323 if (!maxlen || maxlen > PCQ_MAXLEN)
324 return EINVAL;
325 if (pq->pq_maxlen == maxlen)
326 return 0;
327
328 /* First, allocate the new queues and replace them. */
329 qs = kmem_zalloc(slotbytes, KM_SLEEP);
330 for (u_int i = 0; i < ncpu; i++) {
331 qs[i] = pcq_create(maxlen, KM_SLEEP);
332 }
333 mutex_enter(&pq->pq_lock);
334 for (u_int i = 0; i < ncpu; i++) {
335 /* Swap: store of a word is atomic. */
336 pcq_t *q = pq->pq_queue[i];
337 pq->pq_queue[i] = qs[i];
338 qs[i] = q;
339 }
340 pq->pq_maxlen = maxlen;
341 mutex_exit(&pq->pq_lock);
342
343 /*
344 * At this point, the new packets are flowing into the new
345 * queues. However, the old queues may have some packets
346 * present which are no longer being processed. We are going
347 * to re-enqueue them. This may change the order of packet
348 * arrival, but it is not considered an issue.
349 *
350 * There may be in-flight interrupts calling pktq_dequeue()
351 * which reference the old queues. Issue a barrier to ensure
352 * that we are going to be the only pcq_get() callers on the
353 * old queues.
354 */
355 pktq_barrier(pq);
356
357 for (u_int i = 0; i < ncpu; i++) {
358 struct mbuf *m;
359
360 while ((m = pcq_get(qs[i])) != NULL) {
361 while (!pcq_put(pq->pq_queue[i], m)) {
362 kpause("pktqrenq", false, 1, NULL);
363 }
364 }
365 pcq_destroy(qs[i]);
366 }
367
368 /* Well, that was fun. */
369 kmem_free(qs, slotbytes);
370 return 0;
371 }
372
373 int
374 sysctl_pktq_maxlen(SYSCTLFN_ARGS, pktqueue_t *pq)
375 {
376 u_int nmaxlen = pktq_get_count(pq, PKTQ_MAXLEN);
377 struct sysctlnode node = *rnode;
378 int error;
379
380 node.sysctl_data = &nmaxlen;
381 error = sysctl_lookup(SYSCTLFN_CALL(&node));
382 if (error || newp == NULL)
383 return error;
384 return pktq_set_maxlen(pq, nmaxlen);
385 }
386
387 int
388 sysctl_pktq_count(SYSCTLFN_ARGS, pktqueue_t *pq, u_int count_id)
389 {
390 uint64_t count = pktq_get_count(pq, count_id);
391 struct sysctlnode node = *rnode;
392
393 node.sysctl_data = &count;
394 return sysctl_lookup(SYSCTLFN_CALL(&node));
395 }
396