rf_engine.c revision 1.43 1 /* $NetBSD: rf_engine.c,v 1.43 2011/04/23 22:22:46 mrg Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /****************************************************************************
30 * *
31 * engine.c -- code for DAG execution engine *
32 * *
33 * Modified to work as follows (holland): *
34 * A user-thread calls into DispatchDAG, which fires off the nodes that *
35 * are direct successors to the header node. DispatchDAG then returns, *
36 * and the rest of the I/O continues asynchronously. As each node *
37 * completes, the node execution function calls FinishNode(). FinishNode *
38 * scans the list of successors to the node and increments the antecedent *
39 * counts. Each node that becomes enabled is placed on a central node *
40 * queue. A dedicated dag-execution thread grabs nodes off of this *
41 * queue and fires them. *
42 * *
43 * NULL nodes are never fired. *
44 * *
45 * Terminator nodes are never fired, but rather cause the callback *
46 * associated with the DAG to be invoked. *
47 * *
48 * If a node fails, the dag either rolls forward to the completion or *
49 * rolls back, undoing previously-completed nodes and fails atomically. *
50 * The direction of recovery is determined by the location of the failed *
51 * node in the graph. If the failure occurred before the commit node in *
52 * the graph, backward recovery is used. Otherwise, forward recovery is *
53 * used. *
54 * *
55 ****************************************************************************/
56
57 #include <sys/cdefs.h>
58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.43 2011/04/23 22:22:46 mrg Exp $");
59
60 #include <sys/errno.h>
61
62 #include "rf_threadstuff.h"
63 #include "rf_dag.h"
64 #include "rf_engine.h"
65 #include "rf_etimer.h"
66 #include "rf_general.h"
67 #include "rf_dagutils.h"
68 #include "rf_shutdown.h"
69 #include "rf_raid.h"
70 #include "rf_kintf.h"
71 #include "rf_paritymap.h"
72
73 static void rf_ShutdownEngine(void *);
74 static void DAGExecutionThread(RF_ThreadArg_t arg);
75 static void rf_RaidIOThread(RF_ThreadArg_t arg);
76
77 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */
78
79 #define DO_LOCK(_r_) \
80 do { \
81 ks = splbio(); \
82 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \
83 } while (0)
84
85 #define DO_UNLOCK(_r_) \
86 do { \
87 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \
88 splx(ks); \
89 } while (0)
90
91 #define DO_WAIT(_r_) \
92 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
93
94 #define DO_SIGNAL(_r_) \
95 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */
96
97 static void
98 rf_ShutdownEngine(void *arg)
99 {
100 RF_Raid_t *raidPtr;
101 int ks;
102
103 raidPtr = (RF_Raid_t *) arg;
104
105 /* Tell the rf_RaidIOThread to shutdown */
106 mutex_enter(&raidPtr->iodone_lock);
107
108 raidPtr->shutdown_raidio = 1;
109 cv_signal(&raidPtr->iodone_cv);
110
111 /* ...and wait for it to tell us it has finished */
112 while (raidPtr->shutdown_raidio)
113 cv_wait(&raidPtr->iodone_cv, &raidPtr->iodone_lock);
114
115 mutex_exit(&raidPtr->iodone_lock);
116
117 /* Now shut down the DAG execution engine. */
118 DO_LOCK(raidPtr);
119 raidPtr->shutdown_engine = 1;
120 DO_SIGNAL(raidPtr);
121 DO_UNLOCK(raidPtr);
122
123 mutex_destroy(&raidPtr->iodone_lock);
124 cv_destroy(&raidPtr->iodone_cv);
125 }
126
127 int
128 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
129 RF_Config_t *cfgPtr)
130 {
131
132 /*
133 * Initialise iodone for the IO thread.
134 */
135 TAILQ_INIT(&(raidPtr->iodone));
136 mutex_init(&raidPtr->iodone_lock, MUTEX_DEFAULT, IPL_VM);
137 cv_init(&raidPtr->iodone_cv, "raidiow");
138
139 rf_mutex_init(&raidPtr->node_queue_mutex);
140 raidPtr->node_queue = NULL;
141 raidPtr->dags_in_flight = 0;
142
143 /* we create the execution thread only once per system boot. no need
144 * to check return code b/c the kernel panics if it can't create the
145 * thread. */
146 #if RF_DEBUG_ENGINE
147 if (rf_engineDebug) {
148 printf("raid%d: Creating engine thread\n", raidPtr->raidid);
149 }
150 #endif
151 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread,
152 DAGExecutionThread, raidPtr,
153 "raid%d", raidPtr->raidid)) {
154 printf("raid%d: Unable to create engine thread\n",
155 raidPtr->raidid);
156 return (ENOMEM);
157 }
158 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread,
159 rf_RaidIOThread, raidPtr,
160 "raidio%d", raidPtr->raidid)) {
161 printf("raid%d: Unable to create raidio thread\n",
162 raidPtr->raidid);
163 return (ENOMEM);
164 }
165 #if RF_DEBUG_ENGINE
166 if (rf_engineDebug) {
167 printf("raid%d: Created engine thread\n", raidPtr->raidid);
168 }
169 #endif
170
171 /* engine thread is now running and waiting for work */
172 #if RF_DEBUG_ENGINE
173 if (rf_engineDebug) {
174 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid);
175 }
176 #endif
177 rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
178
179 return (0);
180 }
181
182 #if 0
183 static int
184 BranchDone(RF_DagNode_t *node)
185 {
186 int i;
187
188 /* return true if forward execution is completed for a node and it's
189 * succedents */
190 switch (node->status) {
191 case rf_wait:
192 /* should never be called in this state */
193 RF_PANIC();
194 break;
195 case rf_fired:
196 /* node is currently executing, so we're not done */
197 return (RF_FALSE);
198 case rf_good:
199 /* for each succedent recursively check branch */
200 for (i = 0; i < node->numSuccedents; i++)
201 if (!BranchDone(node->succedents[i]))
202 return RF_FALSE;
203 return RF_TRUE; /* node and all succedent branches aren't in
204 * fired state */
205 case rf_bad:
206 /* succedents can't fire */
207 return (RF_TRUE);
208 case rf_recover:
209 /* should never be called in this state */
210 RF_PANIC();
211 break;
212 case rf_undone:
213 case rf_panic:
214 /* XXX need to fix this case */
215 /* for now, assume that we're done */
216 return (RF_TRUE);
217 default:
218 /* illegal node status */
219 RF_PANIC();
220 break;
221 }
222 }
223 #endif
224
225 static int
226 NodeReady(RF_DagNode_t *node)
227 {
228 int ready;
229
230 switch (node->dagHdr->status) {
231 case rf_enable:
232 case rf_rollForward:
233 if ((node->status == rf_wait) &&
234 (node->numAntecedents == node->numAntDone))
235 ready = RF_TRUE;
236 else
237 ready = RF_FALSE;
238 break;
239 case rf_rollBackward:
240 RF_ASSERT(node->numSuccDone <= node->numSuccedents);
241 RF_ASSERT(node->numSuccFired <= node->numSuccedents);
242 RF_ASSERT(node->numSuccFired <= node->numSuccDone);
243 if ((node->status == rf_good) &&
244 (node->numSuccDone == node->numSuccedents))
245 ready = RF_TRUE;
246 else
247 ready = RF_FALSE;
248 break;
249 default:
250 printf("Execution engine found illegal DAG status in NodeReady\n");
251 RF_PANIC();
252 break;
253 }
254
255 return (ready);
256 }
257
258
259
260 /* user context and dag-exec-thread context: Fire a node. The node's
261 * status field determines which function, do or undo, to be fired.
262 * This routine assumes that the node's status field has alread been
263 * set to "fired" or "recover" to indicate the direction of execution.
264 */
265 static void
266 FireNode(RF_DagNode_t *node)
267 {
268 switch (node->status) {
269 case rf_fired:
270 /* fire the do function of a node */
271 #if RF_DEBUG_ENGINE
272 if (rf_engineDebug) {
273 printf("raid%d: Firing node 0x%lx (%s)\n",
274 node->dagHdr->raidPtr->raidid,
275 (unsigned long) node, node->name);
276 }
277 #endif
278 if (node->flags & RF_DAGNODE_FLAG_YIELD) {
279 #if defined(__NetBSD__) && defined(_KERNEL)
280 /* thread_block(); */
281 /* printf("Need to block the thread here...\n"); */
282 /* XXX thread_block is actually mentioned in
283 * /usr/include/vm/vm_extern.h */
284 #else
285 thread_block();
286 #endif
287 }
288 (*(node->doFunc)) (node);
289 break;
290 case rf_recover:
291 /* fire the undo function of a node */
292 #if RF_DEBUG_ENGINE
293 if (rf_engineDebug) {
294 printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
295 node->dagHdr->raidPtr->raidid,
296 (unsigned long) node, node->name);
297 }
298 #endif
299 if (node->flags & RF_DAGNODE_FLAG_YIELD)
300 #if defined(__NetBSD__) && defined(_KERNEL)
301 /* thread_block(); */
302 /* printf("Need to block the thread here...\n"); */
303 /* XXX thread_block is actually mentioned in
304 * /usr/include/vm/vm_extern.h */
305 #else
306 thread_block();
307 #endif
308 (*(node->undoFunc)) (node);
309 break;
310 default:
311 RF_PANIC();
312 break;
313 }
314 }
315
316
317
318 /* user context:
319 * Attempt to fire each node in a linear array.
320 * The entire list is fired atomically.
321 */
322 static void
323 FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
324 {
325 RF_DagStatus_t dstat;
326 RF_DagNode_t *node;
327 int i, j;
328
329 /* first, mark all nodes which are ready to be fired */
330 for (i = 0; i < numNodes; i++) {
331 node = nodeList[i];
332 dstat = node->dagHdr->status;
333 RF_ASSERT((node->status == rf_wait) ||
334 (node->status == rf_good));
335 if (NodeReady(node)) {
336 if ((dstat == rf_enable) ||
337 (dstat == rf_rollForward)) {
338 RF_ASSERT(node->status == rf_wait);
339 if (node->commitNode)
340 node->dagHdr->numCommits++;
341 node->status = rf_fired;
342 for (j = 0; j < node->numAntecedents; j++)
343 node->antecedents[j]->numSuccFired++;
344 } else {
345 RF_ASSERT(dstat == rf_rollBackward);
346 RF_ASSERT(node->status == rf_good);
347 /* only one commit node per graph */
348 RF_ASSERT(node->commitNode == RF_FALSE);
349 node->status = rf_recover;
350 }
351 }
352 }
353 /* now, fire the nodes */
354 for (i = 0; i < numNodes; i++) {
355 if ((nodeList[i]->status == rf_fired) ||
356 (nodeList[i]->status == rf_recover))
357 FireNode(nodeList[i]);
358 }
359 }
360
361
362 /* user context:
363 * Attempt to fire each node in a linked list.
364 * The entire list is fired atomically.
365 */
366 static void
367 FireNodeList(RF_DagNode_t *nodeList)
368 {
369 RF_DagNode_t *node, *next;
370 RF_DagStatus_t dstat;
371 int j;
372
373 if (nodeList) {
374 /* first, mark all nodes which are ready to be fired */
375 for (node = nodeList; node; node = next) {
376 next = node->next;
377 dstat = node->dagHdr->status;
378 RF_ASSERT((node->status == rf_wait) ||
379 (node->status == rf_good));
380 if (NodeReady(node)) {
381 if ((dstat == rf_enable) ||
382 (dstat == rf_rollForward)) {
383 RF_ASSERT(node->status == rf_wait);
384 if (node->commitNode)
385 node->dagHdr->numCommits++;
386 node->status = rf_fired;
387 for (j = 0; j < node->numAntecedents; j++)
388 node->antecedents[j]->numSuccFired++;
389 } else {
390 RF_ASSERT(dstat == rf_rollBackward);
391 RF_ASSERT(node->status == rf_good);
392 /* only one commit node per graph */
393 RF_ASSERT(node->commitNode == RF_FALSE);
394 node->status = rf_recover;
395 }
396 }
397 }
398 /* now, fire the nodes */
399 for (node = nodeList; node; node = next) {
400 next = node->next;
401 if ((node->status == rf_fired) ||
402 (node->status == rf_recover))
403 FireNode(node);
404 }
405 }
406 }
407 /* interrupt context:
408 * for each succedent
409 * propagate required results from node to succedent
410 * increment succedent's numAntDone
411 * place newly-enable nodes on node queue for firing
412 *
413 * To save context switches, we don't place NIL nodes on the node queue,
414 * but rather just process them as if they had fired. Note that NIL nodes
415 * that are the direct successors of the header will actually get fired by
416 * DispatchDAG, which is fine because no context switches are involved.
417 *
418 * Important: when running at user level, this can be called by any
419 * disk thread, and so the increment and check of the antecedent count
420 * must be locked. I used the node queue mutex and locked down the
421 * entire function, but this is certainly overkill.
422 */
423 static void
424 PropagateResults(RF_DagNode_t *node, int context)
425 {
426 RF_DagNode_t *s, *a;
427 RF_Raid_t *raidPtr;
428 int i, ks;
429 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be
430 * finished */
431 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata
432 * antecedents */
433 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */
434 RF_DagNode_t *q = NULL, *qh = NULL, *next;
435 int j, skipNode;
436
437 raidPtr = node->dagHdr->raidPtr;
438
439 DO_LOCK(raidPtr);
440
441 /* debug - validate fire counts */
442 for (i = 0; i < node->numAntecedents; i++) {
443 a = *(node->antecedents + i);
444 RF_ASSERT(a->numSuccFired >= a->numSuccDone);
445 RF_ASSERT(a->numSuccFired <= a->numSuccedents);
446 a->numSuccDone++;
447 }
448
449 switch (node->dagHdr->status) {
450 case rf_enable:
451 case rf_rollForward:
452 for (i = 0; i < node->numSuccedents; i++) {
453 s = *(node->succedents + i);
454 RF_ASSERT(s->status == rf_wait);
455 (s->numAntDone)++;
456 if (s->numAntDone == s->numAntecedents) {
457 /* look for NIL nodes */
458 if (s->doFunc == rf_NullNodeFunc) {
459 /* don't fire NIL nodes, just process
460 * them */
461 s->next = finishlist;
462 finishlist = s;
463 } else {
464 /* look to see if the node is to be
465 * skipped */
466 skipNode = RF_FALSE;
467 for (j = 0; j < s->numAntecedents; j++)
468 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad))
469 skipNode = RF_TRUE;
470 if (skipNode) {
471 /* this node has one or more
472 * failed true data
473 * dependencies, so skip it */
474 s->next = skiplist;
475 skiplist = s;
476 } else
477 /* add s to list of nodes (q)
478 * to execute */
479 if (context != RF_INTR_CONTEXT) {
480 /* we only have to
481 * enqueue if we're at
482 * intr context */
483 /* put node on
484 a list to
485 be fired
486 after we
487 unlock */
488 s->next = firelist;
489 firelist = s;
490 } else {
491 /* enqueue the
492 node for
493 the dag
494 exec thread
495 to fire */
496 RF_ASSERT(NodeReady(s));
497 if (q) {
498 q->next = s;
499 q = s;
500 } else {
501 qh = q = s;
502 qh->next = NULL;
503 }
504 }
505 }
506 }
507 }
508
509 if (q) {
510 /* xfer our local list of nodes to the node queue */
511 q->next = raidPtr->node_queue;
512 raidPtr->node_queue = qh;
513 DO_SIGNAL(raidPtr);
514 }
515 DO_UNLOCK(raidPtr);
516
517 for (; skiplist; skiplist = next) {
518 next = skiplist->next;
519 skiplist->status = rf_skipped;
520 for (i = 0; i < skiplist->numAntecedents; i++) {
521 skiplist->antecedents[i]->numSuccFired++;
522 }
523 if (skiplist->commitNode) {
524 skiplist->dagHdr->numCommits++;
525 }
526 rf_FinishNode(skiplist, context);
527 }
528 for (; finishlist; finishlist = next) {
529 /* NIL nodes: no need to fire them */
530 next = finishlist->next;
531 finishlist->status = rf_good;
532 for (i = 0; i < finishlist->numAntecedents; i++) {
533 finishlist->antecedents[i]->numSuccFired++;
534 }
535 if (finishlist->commitNode)
536 finishlist->dagHdr->numCommits++;
537 /*
538 * Okay, here we're calling rf_FinishNode() on
539 * nodes that have the null function as their
540 * work proc. Such a node could be the
541 * terminal node in a DAG. If so, it will
542 * cause the DAG to complete, which will in
543 * turn free memory used by the DAG, which
544 * includes the node in question. Thus, we
545 * must avoid referencing the node at all
546 * after calling rf_FinishNode() on it. */
547 rf_FinishNode(finishlist, context); /* recursive call */
548 }
549 /* fire all nodes in firelist */
550 FireNodeList(firelist);
551 break;
552
553 case rf_rollBackward:
554 for (i = 0; i < node->numAntecedents; i++) {
555 a = *(node->antecedents + i);
556 RF_ASSERT(a->status == rf_good);
557 RF_ASSERT(a->numSuccDone <= a->numSuccedents);
558 RF_ASSERT(a->numSuccDone <= a->numSuccFired);
559
560 if (a->numSuccDone == a->numSuccFired) {
561 if (a->undoFunc == rf_NullNodeFunc) {
562 /* don't fire NIL nodes, just process
563 * them */
564 a->next = finishlist;
565 finishlist = a;
566 } else {
567 if (context != RF_INTR_CONTEXT) {
568 /* we only have to enqueue if
569 * we're at intr context */
570 /* put node on a list to be
571 fired after we unlock */
572 a->next = firelist;
573
574 firelist = a;
575 } else {
576 /* enqueue the node for the
577 dag exec thread to fire */
578 RF_ASSERT(NodeReady(a));
579 if (q) {
580 q->next = a;
581 q = a;
582 } else {
583 qh = q = a;
584 qh->next = NULL;
585 }
586 }
587 }
588 }
589 }
590 if (q) {
591 /* xfer our local list of nodes to the node queue */
592 q->next = raidPtr->node_queue;
593 raidPtr->node_queue = qh;
594 DO_SIGNAL(raidPtr);
595 }
596 DO_UNLOCK(raidPtr);
597 for (; finishlist; finishlist = next) {
598 /* NIL nodes: no need to fire them */
599 next = finishlist->next;
600 finishlist->status = rf_good;
601 /*
602 * Okay, here we're calling rf_FinishNode() on
603 * nodes that have the null function as their
604 * work proc. Such a node could be the first
605 * node in a DAG. If so, it will cause the DAG
606 * to complete, which will in turn free memory
607 * used by the DAG, which includes the node in
608 * question. Thus, we must avoid referencing
609 * the node at all after calling
610 * rf_FinishNode() on it. */
611 rf_FinishNode(finishlist, context); /* recursive call */
612 }
613 /* fire all nodes in firelist */
614 FireNodeList(firelist);
615
616 break;
617 default:
618 printf("Engine found illegal DAG status in PropagateResults()\n");
619 RF_PANIC();
620 break;
621 }
622 }
623
624
625
626 /*
627 * Process a fired node which has completed
628 */
629 static void
630 ProcessNode(RF_DagNode_t *node, int context)
631 {
632 RF_Raid_t *raidPtr;
633
634 raidPtr = node->dagHdr->raidPtr;
635
636 switch (node->status) {
637 case rf_good:
638 /* normal case, don't need to do anything */
639 break;
640 case rf_bad:
641 if ((node->dagHdr->numCommits > 0) ||
642 (node->dagHdr->numCommitNodes == 0)) {
643 /* crossed commit barrier */
644 node->dagHdr->status = rf_rollForward;
645 #if RF_DEBUG_ENGINE
646 if (rf_engineDebug) {
647 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name);
648 }
649 #endif
650 } else {
651 /* never reached commit barrier */
652 node->dagHdr->status = rf_rollBackward;
653 #if RF_DEBUG_ENGINE
654 if (rf_engineDebug) {
655 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name);
656 }
657 #endif
658 }
659 break;
660 case rf_undone:
661 /* normal rollBackward case, don't need to do anything */
662 break;
663 case rf_panic:
664 /* an undo node failed!!! */
665 printf("UNDO of a node failed!!!/n");
666 break;
667 default:
668 printf("node finished execution with an illegal status!!!\n");
669 RF_PANIC();
670 break;
671 }
672
673 /* enqueue node's succedents (antecedents if rollBackward) for
674 * execution */
675 PropagateResults(node, context);
676 }
677
678
679
680 /* user context or dag-exec-thread context:
681 * This is the first step in post-processing a newly-completed node.
682 * This routine is called by each node execution function to mark the node
683 * as complete and fire off any successors that have been enabled.
684 */
685 int
686 rf_FinishNode(RF_DagNode_t *node, int context)
687 {
688 int retcode = RF_FALSE;
689 node->dagHdr->numNodesCompleted++;
690 ProcessNode(node, context);
691
692 return (retcode);
693 }
694
695
696 /* user context: submit dag for execution, return non-zero if we have
697 * to wait for completion. if and only if we return non-zero, we'll
698 * cause cbFunc to get invoked with cbArg when the DAG has completed.
699 *
700 * for now we always return 1. If the DAG does not cause any I/O,
701 * then the callback may get invoked before DispatchDAG returns.
702 * There's code in state 5 of ContinueRaidAccess to handle this.
703 *
704 * All we do here is fire the direct successors of the header node.
705 * The DAG execution thread does the rest of the dag processing. */
706 int
707 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *),
708 void *cbArg)
709 {
710 RF_Raid_t *raidPtr;
711
712 raidPtr = dag->raidPtr;
713 #if RF_ACC_TRACE > 0
714 if (dag->tracerec) {
715 RF_ETIMER_START(dag->tracerec->timer);
716 }
717 #endif
718 #if DEBUG
719 #if RF_DEBUG_VALIDATE_DAG
720 if (rf_engineDebug || rf_validateDAGDebug) {
721 if (rf_ValidateDAG(dag))
722 RF_PANIC();
723 }
724 #endif
725 #endif
726 #if RF_DEBUG_ENGINE
727 if (rf_engineDebug) {
728 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
729 }
730 #endif
731 raidPtr->dags_in_flight++; /* debug only: blow off proper
732 * locking */
733 dag->cbFunc = cbFunc;
734 dag->cbArg = cbArg;
735 dag->numNodesCompleted = 0;
736 dag->status = rf_enable;
737 FireNodeArray(dag->numSuccedents, dag->succedents);
738 return (1);
739 }
740 /* dedicated kernel thread: the thread that handles all DAG node
741 * firing. To minimize locking and unlocking, we grab a copy of the
742 * entire node queue and then set the node queue to NULL before doing
743 * any firing of nodes. This way we only have to release the lock
744 * once. Of course, it's probably rare that there's more than one
745 * node in the queue at any one time, but it sometimes happens.
746 */
747
748 static void
749 DAGExecutionThread(RF_ThreadArg_t arg)
750 {
751 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
752 RF_Raid_t *raidPtr;
753 int ks;
754 int s;
755
756 raidPtr = (RF_Raid_t *) arg;
757
758 #if RF_DEBUG_ENGINE
759 if (rf_engineDebug) {
760 printf("raid%d: Engine thread is running\n", raidPtr->raidid);
761 }
762 #endif
763 s = splbio();
764
765 DO_LOCK(raidPtr);
766 while (!raidPtr->shutdown_engine) {
767
768 while (raidPtr->node_queue != NULL) {
769 local_nq = raidPtr->node_queue;
770 fire_nq = NULL;
771 term_nq = NULL;
772 raidPtr->node_queue = NULL;
773 DO_UNLOCK(raidPtr);
774
775 /* first, strip out the terminal nodes */
776 while (local_nq) {
777 nd = local_nq;
778 local_nq = local_nq->next;
779 switch (nd->dagHdr->status) {
780 case rf_enable:
781 case rf_rollForward:
782 if (nd->numSuccedents == 0) {
783 /* end of the dag, add to
784 * callback list */
785 nd->next = term_nq;
786 term_nq = nd;
787 } else {
788 /* not the end, add to the
789 * fire queue */
790 nd->next = fire_nq;
791 fire_nq = nd;
792 }
793 break;
794 case rf_rollBackward:
795 if (nd->numAntecedents == 0) {
796 /* end of the dag, add to the
797 * callback list */
798 nd->next = term_nq;
799 term_nq = nd;
800 } else {
801 /* not the end, add to the
802 * fire queue */
803 nd->next = fire_nq;
804 fire_nq = nd;
805 }
806 break;
807 default:
808 RF_PANIC();
809 break;
810 }
811 }
812
813 /* execute callback of dags which have reached the
814 * terminal node */
815 while (term_nq) {
816 nd = term_nq;
817 term_nq = term_nq->next;
818 nd->next = NULL;
819 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
820 raidPtr->dags_in_flight--; /* debug only */
821 }
822
823 /* fire remaining nodes */
824 FireNodeList(fire_nq);
825
826 DO_LOCK(raidPtr);
827 }
828 while (!raidPtr->shutdown_engine &&
829 raidPtr->node_queue == NULL) {
830 DO_WAIT(raidPtr);
831 }
832 }
833 DO_UNLOCK(raidPtr);
834
835 splx(s);
836 kthread_exit(0);
837 }
838
839 /*
840 * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy()
841 * puts the I/O on a buf_queue, and then signals raidPtr->iodone. If
842 * necessary, this function calls raidstart() to initiate the I/O.
843 * When I/O to a component completes, KernelWakeupFunc() puts the
844 * completed request onto raidPtr->iodone TAILQ. This function looks
845 * after requests on that queue by calling rf_DiskIOComplete() for the
846 * request, and by calling any required CompleteFunc for the request.
847 */
848
849 static void
850 rf_RaidIOThread(RF_ThreadArg_t arg)
851 {
852 RF_Raid_t *raidPtr;
853 RF_DiskQueueData_t *req;
854 int s;
855
856 raidPtr = (RF_Raid_t *) arg;
857
858 s = splbio();
859 mutex_enter(&raidPtr->iodone_lock);
860
861 while (!raidPtr->shutdown_raidio) {
862 /* if there is nothing to do, then snooze. */
863 if (TAILQ_EMPTY(&(raidPtr->iodone)) &&
864 rf_buf_queue_check(raidPtr->raidid)) {
865 cv_wait(&raidPtr->iodone_cv, &raidPtr->iodone_lock);
866 }
867
868 /* Check for deferred parity-map-related work. */
869 if (raidPtr->parity_map != NULL) {
870 mutex_exit(&raidPtr->iodone_lock);
871 rf_paritymap_checkwork(raidPtr->parity_map);
872 mutex_enter(&raidPtr->iodone_lock);
873 }
874
875 /* See what I/Os, if any, have arrived */
876 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) {
877 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries);
878 mutex_exit(&raidPtr->iodone_lock);
879 rf_DiskIOComplete(req->queue, req, req->error);
880 (req->CompleteFunc) (req->argument, req->error);
881 mutex_enter(&raidPtr->iodone_lock);
882 }
883
884 /* process any pending outgoing IO */
885 mutex_exit(&raidPtr->iodone_lock);
886 raidstart(raidPtr);
887 mutex_enter(&raidPtr->iodone_lock);
888
889 }
890
891 /* Let rf_ShutdownEngine know that we're done... */
892 raidPtr->shutdown_raidio = 0;
893 cv_signal(&raidPtr->iodone_cv);
894
895 mutex_exit(&raidPtr->iodone_lock);
896 splx(s);
897
898 kthread_exit(0);
899 }
900