pktqueue.c revision 1.16 1 /* $NetBSD: pktqueue.c,v 1.16 2021/12/21 04:09:32 knakahara Exp $ */
2
3 /*-
4 * Copyright (c) 2014 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Mindaugas Rasiukevicius.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 /*
33 * The packet queue (pktqueue) interface is a lockless IP input queue
34 * which also abstracts and handles network ISR scheduling. It provides
35 * a mechanism to enable receiver-side packet steering (RPS).
36 */
37
38 #include <sys/cdefs.h>
39 __KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.16 2021/12/21 04:09:32 knakahara Exp $");
40
41 #ifdef _KERNEL_OPT
42 #include "opt_net_mpsafe.h"
43 #endif
44
45 #include <sys/param.h>
46 #include <sys/types.h>
47
48 #include <sys/atomic.h>
49 #include <sys/cpu.h>
50 #include <sys/pcq.h>
51 #include <sys/intr.h>
52 #include <sys/mbuf.h>
53 #include <sys/proc.h>
54 #include <sys/percpu.h>
55 #include <sys/xcall.h>
56
57 #include <net/pktqueue.h>
58 #include <net/rss_config.h>
59
60 #include <netinet/in.h>
61 #include <netinet/ip.h>
62 #include <netinet/ip6.h>
63
64 struct pktqueue {
65 /*
66 * The lock used for a barrier mechanism. The barrier counter,
67 * as well as the drop counter, are managed atomically though.
68 * Ensure this group is in a separate cache line.
69 */
70 union {
71 struct {
72 kmutex_t pq_lock;
73 volatile u_int pq_barrier;
74 };
75 uint8_t _pad[COHERENCY_UNIT];
76 };
77
78 /* The size of the queue, counters and the interrupt handler. */
79 u_int pq_maxlen;
80 percpu_t * pq_counters;
81 void * pq_sih;
82
83 /* Finally, per-CPU queues. */
84 struct percpu * pq_pcq; /* struct pcq * */
85 };
86
87 /* The counters of the packet queue. */
88 #define PQCNT_ENQUEUE 0
89 #define PQCNT_DEQUEUE 1
90 #define PQCNT_DROP 2
91 #define PQCNT_NCOUNTERS 3
92
93 typedef struct {
94 uint64_t count[PQCNT_NCOUNTERS];
95 } pktq_counters_t;
96
97 /* Special marker value used by pktq_barrier() mechanism. */
98 #define PKTQ_MARKER ((void *)(~0ULL))
99
100 static void
101 pktq_init_cpu(void *vqp, void *vpq, struct cpu_info *ci)
102 {
103 struct pcq **qp = vqp;
104 struct pktqueue *pq = vpq;
105
106 *qp = pcq_create(pq->pq_maxlen, KM_SLEEP);
107 }
108
109 static void
110 pktq_fini_cpu(void *vqp, void *vpq, struct cpu_info *ci)
111 {
112 struct pcq **qp = vqp, *q = *qp;
113
114 KASSERT(pcq_peek(q) == NULL);
115 pcq_destroy(q);
116 *qp = NULL; /* paranoia */
117 }
118
119 static struct pcq *
120 pktq_pcq(struct pktqueue *pq, struct cpu_info *ci)
121 {
122 struct pcq **qp, *q;
123
124 /*
125 * As long as preemption is disabled, the xcall to swap percpu
126 * buffers can't complete, so it is safe to read the pointer.
127 */
128 KASSERT(kpreempt_disabled());
129
130 qp = percpu_getptr_remote(pq->pq_pcq, ci);
131 q = *qp;
132
133 return q;
134 }
135
136 pktqueue_t *
137 pktq_create(size_t maxlen, void (*intrh)(void *), void *sc)
138 {
139 const u_int sflags = SOFTINT_NET | SOFTINT_MPSAFE | SOFTINT_RCPU;
140 pktqueue_t *pq;
141 percpu_t *pc;
142 void *sih;
143
144 pc = percpu_alloc(sizeof(pktq_counters_t));
145 if ((sih = softint_establish(sflags, intrh, sc)) == NULL) {
146 percpu_free(pc, sizeof(pktq_counters_t));
147 return NULL;
148 }
149
150 pq = kmem_zalloc(sizeof(*pq), KM_SLEEP);
151 mutex_init(&pq->pq_lock, MUTEX_DEFAULT, IPL_NONE);
152 pq->pq_maxlen = maxlen;
153 pq->pq_counters = pc;
154 pq->pq_sih = sih;
155 pq->pq_pcq = percpu_create(sizeof(struct pcq *),
156 pktq_init_cpu, pktq_fini_cpu, pq);
157
158 return pq;
159 }
160
161 void
162 pktq_destroy(pktqueue_t *pq)
163 {
164
165 percpu_free(pq->pq_pcq, sizeof(struct pcq *));
166 percpu_free(pq->pq_counters, sizeof(pktq_counters_t));
167 softint_disestablish(pq->pq_sih);
168 mutex_destroy(&pq->pq_lock);
169 kmem_free(pq, sizeof(*pq));
170 }
171
172 /*
173 * - pktq_inc_counter: increment the counter given an ID.
174 * - pktq_collect_counts: handler to sum up the counts from each CPU.
175 * - pktq_getcount: return the effective count given an ID.
176 */
177
178 static inline void
179 pktq_inc_count(pktqueue_t *pq, u_int i)
180 {
181 percpu_t *pc = pq->pq_counters;
182 pktq_counters_t *c;
183
184 c = percpu_getref(pc);
185 c->count[i]++;
186 percpu_putref(pc);
187 }
188
189 static void
190 pktq_collect_counts(void *mem, void *arg, struct cpu_info *ci)
191 {
192 const pktq_counters_t *c = mem;
193 pktq_counters_t *sum = arg;
194
195 int s = splnet();
196
197 for (u_int i = 0; i < PQCNT_NCOUNTERS; i++) {
198 sum->count[i] += c->count[i];
199 }
200
201 splx(s);
202 }
203
204 uint64_t
205 pktq_get_count(pktqueue_t *pq, pktq_count_t c)
206 {
207 pktq_counters_t sum;
208
209 if (c != PKTQ_MAXLEN) {
210 memset(&sum, 0, sizeof(sum));
211 percpu_foreach_xcall(pq->pq_counters,
212 XC_HIGHPRI_IPL(IPL_SOFTNET), pktq_collect_counts, &sum);
213 }
214 switch (c) {
215 case PKTQ_NITEMS:
216 return sum.count[PQCNT_ENQUEUE] - sum.count[PQCNT_DEQUEUE];
217 case PKTQ_DROPS:
218 return sum.count[PQCNT_DROP];
219 case PKTQ_MAXLEN:
220 return pq->pq_maxlen;
221 }
222 return 0;
223 }
224
225 uint32_t
226 pktq_rps_hash(pktq_rps_hash_func_t *funcp, const struct mbuf *m)
227 {
228 pktq_rps_hash_func_t func = atomic_load_relaxed(funcp);
229
230 KASSERT(func != NULL);
231
232 return (*func)(m);
233 }
234
235 static uint32_t
236 pktq_rps_hash_zero(const struct mbuf *m __unused)
237 {
238
239 return 0;
240 }
241
242 static uint32_t
243 pktq_rps_hash_curcpu(const struct mbuf *m __unused)
244 {
245
246 return cpu_index(curcpu());
247 }
248
249 static uint32_t
250 pktq_rps_hash_toeplitz(const struct mbuf *m)
251 {
252 struct ip *ip;
253 /*
254 * Disable UDP port - IP fragments aren't currently being handled
255 * and so we end up with a mix of 2-tuple and 4-tuple
256 * traffic.
257 */
258 const u_int flag = RSS_TOEPLITZ_USE_TCP_PORT;
259
260 /* glance IP version */
261 if ((m->m_flags & M_PKTHDR) == 0)
262 return 0;
263
264 ip = mtod(m, struct ip *);
265 if (ip->ip_v == IPVERSION) {
266 if (__predict_false(m->m_len < sizeof(struct ip)))
267 return 0;
268 return rss_toeplitz_hash_from_mbuf_ipv4(m, flag);
269 } else if (ip->ip_v == 6) {
270 if (__predict_false(m->m_len < sizeof(struct ip6_hdr)))
271 return 0;
272 return rss_toeplitz_hash_from_mbuf_ipv6(m, flag);
273 }
274
275 return 0;
276 }
277
278 /*
279 * toeplitz without curcpu.
280 * Generally, this has better performance than toeplitz.
281 */
282 static uint32_t
283 pktq_rps_hash_toeplitz_othercpus(const struct mbuf *m)
284 {
285 uint32_t hash;
286
287 if (ncpu == 1)
288 return 0;
289
290 hash = pktq_rps_hash_toeplitz(m);
291 hash %= ncpu - 1;
292 if (hash >= cpu_index(curcpu()))
293 return hash + 1;
294 else
295 return hash;
296 }
297
298 static struct pktq_rps_hash_table {
299 const char* prh_type;
300 pktq_rps_hash_func_t prh_func;
301 } const pktq_rps_hash_tab[] = {
302 { "zero", pktq_rps_hash_zero },
303 { "curcpu", pktq_rps_hash_curcpu },
304 { "toeplitz", pktq_rps_hash_toeplitz },
305 { "toeplitz-othercpus", pktq_rps_hash_toeplitz_othercpus },
306 };
307 const pktq_rps_hash_func_t pktq_rps_hash_default =
308 #ifdef NET_MPSAFE
309 pktq_rps_hash_curcpu;
310 #else
311 pktq_rps_hash_zero;
312 #endif
313
314 static const char *
315 pktq_get_rps_hash_type(pktq_rps_hash_func_t func)
316 {
317
318 for (int i = 0; i < __arraycount(pktq_rps_hash_tab); i++) {
319 if (func == pktq_rps_hash_tab[i].prh_func) {
320 return pktq_rps_hash_tab[i].prh_type;
321 }
322 }
323
324 return NULL;
325 }
326
327 static int
328 pktq_set_rps_hash_type(pktq_rps_hash_func_t *func, const char *type)
329 {
330
331 if (strcmp(type, pktq_get_rps_hash_type(*func)) == 0)
332 return 0;
333
334 for (int i = 0; i < __arraycount(pktq_rps_hash_tab); i++) {
335 if (strcmp(type, pktq_rps_hash_tab[i].prh_type) == 0) {
336 atomic_store_relaxed(func, pktq_rps_hash_tab[i].prh_func);
337 return 0;
338 }
339 }
340
341 return ENOENT;
342 }
343
344 int
345 sysctl_pktq_rps_hash_handler(SYSCTLFN_ARGS)
346 {
347 struct sysctlnode node;
348 pktq_rps_hash_func_t *func;
349 int error;
350 char type[PKTQ_RPS_HASH_NAME_LEN];
351
352 node = *rnode;
353 func = node.sysctl_data;
354
355 strlcpy(type, pktq_get_rps_hash_type(*func), PKTQ_RPS_HASH_NAME_LEN);
356
357 node.sysctl_data = &type;
358 node.sysctl_size = sizeof(type);
359 error = sysctl_lookup(SYSCTLFN_CALL(&node));
360 if (error || newp == NULL)
361 return error;
362
363 error = pktq_set_rps_hash_type(func, type);
364
365 return error;
366 }
367
368 /*
369 * pktq_enqueue: inject the packet into the end of the queue.
370 *
371 * => Must be called from the interrupt or with the preemption disabled.
372 * => Consumes the packet and returns true on success.
373 * => Returns false on failure; caller is responsible to free the packet.
374 */
375 bool
376 pktq_enqueue(pktqueue_t *pq, struct mbuf *m, const u_int hash __unused)
377 {
378 #if defined(_RUMPKERNEL) || defined(_RUMP_NATIVE_ABI)
379 struct cpu_info *ci = curcpu();
380 #else
381 struct cpu_info *ci = cpu_lookup(hash % ncpu);
382 #endif
383
384 KASSERT(kpreempt_disabled());
385
386 if (__predict_false(!pcq_put(pktq_pcq(pq, ci), m))) {
387 pktq_inc_count(pq, PQCNT_DROP);
388 return false;
389 }
390 softint_schedule_cpu(pq->pq_sih, ci);
391 pktq_inc_count(pq, PQCNT_ENQUEUE);
392 return true;
393 }
394
395 /*
396 * pktq_dequeue: take a packet from the queue.
397 *
398 * => Must be called with preemption disabled.
399 * => Must ensure there are not concurrent dequeue calls.
400 */
401 struct mbuf *
402 pktq_dequeue(pktqueue_t *pq)
403 {
404 struct cpu_info *ci = curcpu();
405 struct mbuf *m;
406
407 KASSERT(kpreempt_disabled());
408
409 m = pcq_get(pktq_pcq(pq, ci));
410 if (__predict_false(m == PKTQ_MARKER)) {
411 /* Note the marker entry. */
412 atomic_inc_uint(&pq->pq_barrier);
413 return NULL;
414 }
415 if (__predict_true(m != NULL)) {
416 pktq_inc_count(pq, PQCNT_DEQUEUE);
417 }
418 return m;
419 }
420
421 /*
422 * pktq_barrier: waits for a grace period when all packets enqueued at
423 * the moment of calling this routine will be processed. This is used
424 * to ensure that e.g. packets referencing some interface were drained.
425 */
426 void
427 pktq_barrier(pktqueue_t *pq)
428 {
429 CPU_INFO_ITERATOR cii;
430 struct cpu_info *ci;
431 u_int pending = 0;
432
433 mutex_enter(&pq->pq_lock);
434 KASSERT(pq->pq_barrier == 0);
435
436 for (CPU_INFO_FOREACH(cii, ci)) {
437 struct pcq *q;
438
439 kpreempt_disable();
440 q = pktq_pcq(pq, ci);
441 kpreempt_enable();
442
443 /* If the queue is empty - nothing to do. */
444 if (pcq_peek(q) == NULL) {
445 continue;
446 }
447 /* Otherwise, put the marker and entry. */
448 while (!pcq_put(q, PKTQ_MARKER)) {
449 kpause("pktqsync", false, 1, NULL);
450 }
451 kpreempt_disable();
452 softint_schedule_cpu(pq->pq_sih, ci);
453 kpreempt_enable();
454 pending++;
455 }
456
457 /* Wait for each queue to process the markers. */
458 while (pq->pq_barrier != pending) {
459 kpause("pktqsync", false, 1, NULL);
460 }
461 pq->pq_barrier = 0;
462 mutex_exit(&pq->pq_lock);
463 }
464
465 /*
466 * pktq_flush: free mbufs in all queues.
467 *
468 * => The caller must ensure there are no concurrent writers or flush calls.
469 */
470 void
471 pktq_flush(pktqueue_t *pq)
472 {
473 CPU_INFO_ITERATOR cii;
474 struct cpu_info *ci;
475 struct mbuf *m;
476
477 for (CPU_INFO_FOREACH(cii, ci)) {
478 struct pcq *q;
479
480 kpreempt_disable();
481 q = pktq_pcq(pq, ci);
482 kpreempt_enable();
483
484 /*
485 * XXX This can't be right -- if the softint is running
486 * then pcq_get isn't safe here.
487 */
488 while ((m = pcq_get(q)) != NULL) {
489 pktq_inc_count(pq, PQCNT_DEQUEUE);
490 m_freem(m);
491 }
492 }
493 }
494
495 static void
496 pktq_set_maxlen_cpu(void *vpq, void *vqs)
497 {
498 struct pktqueue *pq = vpq;
499 struct pcq **qp, *q, **qs = vqs;
500 unsigned i = cpu_index(curcpu());
501 int s;
502
503 s = splnet();
504 qp = percpu_getref(pq->pq_pcq);
505 q = *qp;
506 *qp = qs[i];
507 qs[i] = q;
508 percpu_putref(pq->pq_pcq);
509 splx(s);
510 }
511
512 /*
513 * pktq_set_maxlen: create per-CPU queues using a new size and replace
514 * the existing queues without losing any packets.
515 *
516 * XXX ncpu must remain stable throughout.
517 */
518 int
519 pktq_set_maxlen(pktqueue_t *pq, size_t maxlen)
520 {
521 const u_int slotbytes = ncpu * sizeof(pcq_t *);
522 pcq_t **qs;
523
524 if (!maxlen || maxlen > PCQ_MAXLEN)
525 return EINVAL;
526 if (pq->pq_maxlen == maxlen)
527 return 0;
528
529 /* First, allocate the new queues. */
530 qs = kmem_zalloc(slotbytes, KM_SLEEP);
531 for (u_int i = 0; i < ncpu; i++) {
532 qs[i] = pcq_create(maxlen, KM_SLEEP);
533 }
534
535 /*
536 * Issue an xcall to replace the queue pointers on each CPU.
537 * This implies all the necessary memory barriers.
538 */
539 mutex_enter(&pq->pq_lock);
540 xc_wait(xc_broadcast(XC_HIGHPRI, pktq_set_maxlen_cpu, pq, qs));
541 pq->pq_maxlen = maxlen;
542 mutex_exit(&pq->pq_lock);
543
544 /*
545 * At this point, the new packets are flowing into the new
546 * queues. However, the old queues may have some packets
547 * present which are no longer being processed. We are going
548 * to re-enqueue them. This may change the order of packet
549 * arrival, but it is not considered an issue.
550 *
551 * There may be in-flight interrupts calling pktq_dequeue()
552 * which reference the old queues. Issue a barrier to ensure
553 * that we are going to be the only pcq_get() callers on the
554 * old queues.
555 */
556 pktq_barrier(pq);
557
558 for (u_int i = 0; i < ncpu; i++) {
559 struct pcq *q;
560 struct mbuf *m;
561
562 kpreempt_disable();
563 q = pktq_pcq(pq, cpu_lookup(i));
564 kpreempt_enable();
565
566 while ((m = pcq_get(qs[i])) != NULL) {
567 while (!pcq_put(q, m)) {
568 kpause("pktqrenq", false, 1, NULL);
569 }
570 }
571 pcq_destroy(qs[i]);
572 }
573
574 /* Well, that was fun. */
575 kmem_free(qs, slotbytes);
576 return 0;
577 }
578
579 int
580 sysctl_pktq_maxlen(SYSCTLFN_ARGS, pktqueue_t *pq)
581 {
582 u_int nmaxlen = pktq_get_count(pq, PKTQ_MAXLEN);
583 struct sysctlnode node = *rnode;
584 int error;
585
586 node.sysctl_data = &nmaxlen;
587 error = sysctl_lookup(SYSCTLFN_CALL(&node));
588 if (error || newp == NULL)
589 return error;
590 return pktq_set_maxlen(pq, nmaxlen);
591 }
592
593 int
594 sysctl_pktq_count(SYSCTLFN_ARGS, pktqueue_t *pq, u_int count_id)
595 {
596 uint64_t count = pktq_get_count(pq, count_id);
597 struct sysctlnode node = *rnode;
598
599 node.sysctl_data = &count;
600 return sysctl_lookup(SYSCTLFN_CALL(&node));
601 }
602