pktqueue.c revision 1.10 1 /* $NetBSD: pktqueue.c,v 1.10 2018/08/10 07:24:09 msaitoh Exp $ */
2
3 /*-
4 * Copyright (c) 2014 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Mindaugas Rasiukevicius.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 /*
33 * The packet queue (pktqueue) interface is a lockless IP input queue
34 * which also abstracts and handles network ISR scheduling. It provides
35 * a mechanism to enable receiver-side packet steering (RPS).
36 */
37
38 #include <sys/cdefs.h>
39 __KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.10 2018/08/10 07:24:09 msaitoh Exp $");
40
41 #include <sys/param.h>
42 #include <sys/types.h>
43
44 #include <sys/atomic.h>
45 #include <sys/cpu.h>
46 #include <sys/pcq.h>
47 #include <sys/intr.h>
48 #include <sys/mbuf.h>
49 #include <sys/proc.h>
50 #include <sys/percpu.h>
51
52 #include <net/pktqueue.h>
53
54 /*
55 * WARNING: update this if struct pktqueue changes.
56 */
57 #define PKTQ_CLPAD \
58 MAX(COHERENCY_UNIT, COHERENCY_UNIT - sizeof(kmutex_t) - sizeof(u_int))
59
60 struct pktqueue {
61 /*
62 * The lock used for a barrier mechanism. The barrier counter,
63 * as well as the drop counter, are managed atomically though.
64 * Ensure this group is in a separate cache line.
65 */
66 kmutex_t pq_lock;
67 volatile u_int pq_barrier;
68 uint8_t _pad[PKTQ_CLPAD];
69
70 /* The size of the queue, counters and the interrupt handler. */
71 u_int pq_maxlen;
72 percpu_t * pq_counters;
73 void * pq_sih;
74
75 /* Finally, per-CPU queues. */
76 pcq_t * pq_queue[];
77 };
78
79 /* The counters of the packet queue. */
80 #define PQCNT_ENQUEUE 0
81 #define PQCNT_DEQUEUE 1
82 #define PQCNT_DROP 2
83 #define PQCNT_NCOUNTERS 3
84
85 typedef struct {
86 uint64_t count[PQCNT_NCOUNTERS];
87 } pktq_counters_t;
88
89 /* Special marker value used by pktq_barrier() mechanism. */
90 #define PKTQ_MARKER ((void *)(~0ULL))
91
92 /*
93 * The total size of pktqueue_t which depends on the number of CPUs.
94 */
95 #define PKTQUEUE_STRUCT_LEN(ncpu) \
96 roundup2(offsetof(pktqueue_t, pq_queue[ncpu]), coherency_unit)
97
98 pktqueue_t *
99 pktq_create(size_t maxlen, void (*intrh)(void *), void *sc)
100 {
101 const u_int sflags = SOFTINT_NET | SOFTINT_MPSAFE | SOFTINT_RCPU;
102 const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
103 pktqueue_t *pq;
104 percpu_t *pc;
105 void *sih;
106
107 pc = percpu_alloc(sizeof(pktq_counters_t));
108 if ((sih = softint_establish(sflags, intrh, sc)) == NULL) {
109 percpu_free(pc, sizeof(pktq_counters_t));
110 return NULL;
111 }
112
113 pq = kmem_zalloc(len, KM_SLEEP);
114 for (u_int i = 0; i < ncpu; i++) {
115 pq->pq_queue[i] = pcq_create(maxlen, KM_SLEEP);
116 }
117 mutex_init(&pq->pq_lock, MUTEX_DEFAULT, IPL_NONE);
118 pq->pq_maxlen = maxlen;
119 pq->pq_counters = pc;
120 pq->pq_sih = sih;
121
122 return pq;
123 }
124
125 void
126 pktq_destroy(pktqueue_t *pq)
127 {
128 const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
129
130 for (u_int i = 0; i < ncpu; i++) {
131 pcq_t *q = pq->pq_queue[i];
132 KASSERT(pcq_peek(q) == NULL);
133 pcq_destroy(q);
134 }
135 percpu_free(pq->pq_counters, sizeof(pktq_counters_t));
136 softint_disestablish(pq->pq_sih);
137 mutex_destroy(&pq->pq_lock);
138 kmem_free(pq, len);
139 }
140
141 /*
142 * - pktq_inc_counter: increment the counter given an ID.
143 * - pktq_collect_counts: handler to sum up the counts from each CPU.
144 * - pktq_getcount: return the effective count given an ID.
145 */
146
147 static inline void
148 pktq_inc_count(pktqueue_t *pq, u_int i)
149 {
150 percpu_t *pc = pq->pq_counters;
151 pktq_counters_t *c;
152
153 c = percpu_getref(pc);
154 c->count[i]++;
155 percpu_putref(pc);
156 }
157
158 static void
159 pktq_collect_counts(void *mem, void *arg, struct cpu_info *ci)
160 {
161 const pktq_counters_t *c = mem;
162 pktq_counters_t *sum = arg;
163
164 for (u_int i = 0; i < PQCNT_NCOUNTERS; i++) {
165 sum->count[i] += c->count[i];
166 }
167 }
168
169 uint64_t
170 pktq_get_count(pktqueue_t *pq, pktq_count_t c)
171 {
172 pktq_counters_t sum;
173
174 if (c != PKTQ_MAXLEN) {
175 memset(&sum, 0, sizeof(sum));
176 percpu_foreach(pq->pq_counters, pktq_collect_counts, &sum);
177 }
178 switch (c) {
179 case PKTQ_NITEMS:
180 return sum.count[PQCNT_ENQUEUE] - sum.count[PQCNT_DEQUEUE];
181 case PKTQ_DROPS:
182 return sum.count[PQCNT_DROP];
183 case PKTQ_MAXLEN:
184 return pq->pq_maxlen;
185 }
186 return 0;
187 }
188
189 uint32_t
190 pktq_rps_hash(const struct mbuf *m __unused)
191 {
192 /*
193 * XXX: No distribution yet; the softnet_lock contention
194 * XXX: must be eliminated first.
195 */
196 return 0;
197 }
198
199 /*
200 * pktq_enqueue: inject the packet into the end of the queue.
201 *
202 * => Must be called from the interrupt or with the preemption disabled.
203 * => Consumes the packet and returns true on success.
204 * => Returns false on failure; caller is responsible to free the packet.
205 */
206 bool
207 pktq_enqueue(pktqueue_t *pq, struct mbuf *m, const u_int hash __unused)
208 {
209 #if defined(_RUMPKERNEL) || defined(_RUMP_NATIVE_ABI)
210 const unsigned cpuid = curcpu()->ci_index;
211 #else
212 const unsigned cpuid = hash % ncpu;
213 #endif
214
215 KASSERT(kpreempt_disabled());
216
217 if (__predict_false(!pcq_put(pq->pq_queue[cpuid], m))) {
218 pktq_inc_count(pq, PQCNT_DROP);
219 return false;
220 }
221 softint_schedule_cpu(pq->pq_sih, cpu_lookup(cpuid));
222 pktq_inc_count(pq, PQCNT_ENQUEUE);
223 return true;
224 }
225
226 /*
227 * pktq_dequeue: take a packet from the queue.
228 *
229 * => Must be called with preemption disabled.
230 * => Must ensure there are not concurrent dequeue calls.
231 */
232 struct mbuf *
233 pktq_dequeue(pktqueue_t *pq)
234 {
235 const struct cpu_info *ci = curcpu();
236 const unsigned cpuid = cpu_index(ci);
237 struct mbuf *m;
238
239 m = pcq_get(pq->pq_queue[cpuid]);
240 if (__predict_false(m == PKTQ_MARKER)) {
241 /* Note the marker entry. */
242 atomic_inc_uint(&pq->pq_barrier);
243 return NULL;
244 }
245 if (__predict_true(m != NULL)) {
246 pktq_inc_count(pq, PQCNT_DEQUEUE);
247 }
248 return m;
249 }
250
251 /*
252 * pktq_barrier: waits for a grace period when all packets enqueued at
253 * the moment of calling this routine will be processed. This is used
254 * to ensure that e.g. packets referencing some interface were drained.
255 */
256 void
257 pktq_barrier(pktqueue_t *pq)
258 {
259 u_int pending = 0;
260
261 mutex_enter(&pq->pq_lock);
262 KASSERT(pq->pq_barrier == 0);
263
264 for (u_int i = 0; i < ncpu; i++) {
265 pcq_t *q = pq->pq_queue[i];
266
267 /* If the queue is empty - nothing to do. */
268 if (pcq_peek(q) == NULL) {
269 continue;
270 }
271 /* Otherwise, put the marker and entry. */
272 while (!pcq_put(q, PKTQ_MARKER)) {
273 kpause("pktqsync", false, 1, NULL);
274 }
275 kpreempt_disable();
276 softint_schedule_cpu(pq->pq_sih, cpu_lookup(i));
277 kpreempt_enable();
278 pending++;
279 }
280
281 /* Wait for each queue to process the markers. */
282 while (pq->pq_barrier != pending) {
283 kpause("pktqsync", false, 1, NULL);
284 }
285 pq->pq_barrier = 0;
286 mutex_exit(&pq->pq_lock);
287 }
288
289 /*
290 * pktq_flush: free mbufs in all queues.
291 *
292 * => The caller must ensure there are no concurrent writers or flush calls.
293 */
294 void
295 pktq_flush(pktqueue_t *pq)
296 {
297 struct mbuf *m;
298
299 for (u_int i = 0; i < ncpu; i++) {
300 while ((m = pcq_get(pq->pq_queue[i])) != NULL) {
301 pktq_inc_count(pq, PQCNT_DEQUEUE);
302 m_freem(m);
303 }
304 }
305 }
306
307 /*
308 * pktq_set_maxlen: create per-CPU queues using a new size and replace
309 * the existing queues without losing any packets.
310 */
311 int
312 pktq_set_maxlen(pktqueue_t *pq, size_t maxlen)
313 {
314 const u_int slotbytes = ncpu * sizeof(pcq_t *);
315 pcq_t **qs;
316
317 if (!maxlen || maxlen > PCQ_MAXLEN)
318 return EINVAL;
319 if (pq->pq_maxlen == maxlen)
320 return 0;
321
322 /* First, allocate the new queues and replace them. */
323 qs = kmem_zalloc(slotbytes, KM_SLEEP);
324 for (u_int i = 0; i < ncpu; i++) {
325 qs[i] = pcq_create(maxlen, KM_SLEEP);
326 }
327 mutex_enter(&pq->pq_lock);
328 for (u_int i = 0; i < ncpu; i++) {
329 /* Swap: store of a word is atomic. */
330 pcq_t *q = pq->pq_queue[i];
331 pq->pq_queue[i] = qs[i];
332 qs[i] = q;
333 }
334 pq->pq_maxlen = maxlen;
335 mutex_exit(&pq->pq_lock);
336
337 /*
338 * At this point, the new packets are flowing into the new
339 * queues. However, the old queues may have some packets
340 * present which are no longer being processed. We are going
341 * to re-enqueue them. This may change the order of packet
342 * arrival, but it is not considered an issue.
343 *
344 * There may be in-flight interrupts calling pktq_dequeue()
345 * which reference the old queues. Issue a barrier to ensure
346 * that we are going to be the only pcq_get() callers on the
347 * old queues.
348 */
349 pktq_barrier(pq);
350
351 for (u_int i = 0; i < ncpu; i++) {
352 struct mbuf *m;
353
354 while ((m = pcq_get(qs[i])) != NULL) {
355 while (!pcq_put(pq->pq_queue[i], m)) {
356 kpause("pktqrenq", false, 1, NULL);
357 }
358 }
359 pcq_destroy(qs[i]);
360 }
361
362 /* Well, that was fun. */
363 kmem_free(qs, slotbytes);
364 return 0;
365 }
366
367 int
368 sysctl_pktq_maxlen(SYSCTLFN_ARGS, pktqueue_t *pq)
369 {
370 u_int nmaxlen = pktq_get_count(pq, PKTQ_MAXLEN);
371 struct sysctlnode node = *rnode;
372 int error;
373
374 node.sysctl_data = &nmaxlen;
375 error = sysctl_lookup(SYSCTLFN_CALL(&node));
376 if (error || newp == NULL)
377 return error;
378 return pktq_set_maxlen(pq, nmaxlen);
379 }
380
381 int
382 sysctl_pktq_count(SYSCTLFN_ARGS, pktqueue_t *pq, u_int count_id)
383 {
384 uint64_t count = pktq_get_count(pq, count_id);
385 struct sysctlnode node = *rnode;
386
387 node.sysctl_data = &count;
388 return sysctl_lookup(SYSCTLFN_CALL(&node));
389 }
390