rf_engine.c revision 1.27 1 /* $NetBSD: rf_engine.c,v 1.27 2003/12/29 05:48:13 oster Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /****************************************************************************
30 * *
31 * engine.c -- code for DAG execution engine *
32 * *
33 * Modified to work as follows (holland): *
34 * A user-thread calls into DispatchDAG, which fires off the nodes that *
35 * are direct successors to the header node. DispatchDAG then returns, *
36 * and the rest of the I/O continues asynchronously. As each node *
37 * completes, the node execution function calls FinishNode(). FinishNode *
38 * scans the list of successors to the node and increments the antecedent *
39 * counts. Each node that becomes enabled is placed on a central node *
40 * queue. A dedicated dag-execution thread grabs nodes off of this *
41 * queue and fires them. *
42 * *
43 * NULL nodes are never fired. *
44 * *
45 * Terminator nodes are never fired, but rather cause the callback *
46 * associated with the DAG to be invoked. *
47 * *
48 * If a node fails, the dag either rolls forward to the completion or *
49 * rolls back, undoing previously-completed nodes and fails atomically. *
50 * The direction of recovery is determined by the location of the failed *
51 * node in the graph. If the failure occurred before the commit node in *
52 * the graph, backward recovery is used. Otherwise, forward recovery is *
53 * used. *
54 * *
55 ****************************************************************************/
56
57 #include <sys/cdefs.h>
58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.27 2003/12/29 05:48:13 oster Exp $");
59
60 #include <sys/errno.h>
61
62 #include "rf_threadstuff.h"
63 #include "rf_dag.h"
64 #include "rf_engine.h"
65 #include "rf_etimer.h"
66 #include "rf_general.h"
67 #include "rf_dagutils.h"
68 #include "rf_shutdown.h"
69 #include "rf_raid.h"
70
71 static void rf_ShutdownEngine(void *);
72 static void DAGExecutionThread(RF_ThreadArg_t arg);
73 static void rf_RaidIOThread(RF_ThreadArg_t arg);
74
75 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */
76
77 #define DO_LOCK(_r_) \
78 do { \
79 ks = splbio(); \
80 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \
81 } while (0)
82
83 #define DO_UNLOCK(_r_) \
84 do { \
85 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \
86 splx(ks); \
87 } while (0)
88
89 #define DO_WAIT(_r_) \
90 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
91
92 #define DO_SIGNAL(_r_) \
93 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */
94
95 static void
96 rf_ShutdownEngine(arg)
97 void *arg;
98 {
99 RF_Raid_t *raidPtr;
100 int ks;
101
102 raidPtr = (RF_Raid_t *) arg;
103
104 /* Tell the rf_RaidIOThread to shutdown */
105 simple_lock(&(raidPtr->iodone_lock));
106
107 raidPtr->shutdown_raidio = 1;
108 wakeup(&(raidPtr->iodone));
109
110 /* ...and wait for it to tell us it has finished */
111 while (raidPtr->shutdown_raidio)
112 ltsleep(&(raidPtr->shutdown_raidio), PRIBIO, "raidshutdown", 0,
113 &(raidPtr->iodone_lock));
114
115 simple_unlock(&(raidPtr->iodone_lock));
116
117 /* Now shut down the DAG execution engine. */
118 DO_LOCK(raidPtr);
119 raidPtr->shutdown_engine = 1;
120 DO_SIGNAL(raidPtr);
121 DO_UNLOCK(raidPtr);
122
123 }
124
125 int
126 rf_ConfigureEngine(
127 RF_ShutdownList_t ** listp,
128 RF_Raid_t * raidPtr,
129 RF_Config_t * cfgPtr)
130 {
131 int rc;
132
133 rf_mutex_init(&raidPtr->node_queue_mutex);
134 raidPtr->node_queue_cond = 0;
135 raidPtr->node_queue = NULL;
136 raidPtr->dags_in_flight = 0;
137
138 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg);
139 if (rc)
140 return (rc);
141
142 /* we create the execution thread only once per system boot. no need
143 * to check return code b/c the kernel panics if it can't create the
144 * thread. */
145 if (rf_engineDebug) {
146 printf("raid%d: Creating engine thread\n", raidPtr->raidid);
147 }
148 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread,
149 DAGExecutionThread, raidPtr,
150 "raid%d", raidPtr->raidid)) {
151 printf("raid%d: Unable to create engine thread\n",
152 raidPtr->raidid);
153 return (ENOMEM);
154 }
155 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread,
156 rf_RaidIOThread, raidPtr,
157 "raidio%d", raidPtr->raidid)) {
158 printf("raid%d: Unable to create raidio thread\n",
159 raidPtr->raidid);
160 return (ENOMEM);
161 }
162 if (rf_engineDebug) {
163 printf("raid%d: Created engine thread\n", raidPtr->raidid);
164 }
165 RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
166 /* XXX something is missing here... */
167 #ifdef debug
168 printf("Skipping the WAIT_START!!\n");
169 #endif
170 #if 0
171 RF_THREADGROUP_WAIT_START(&raidPtr->engine_tg);
172 #endif
173 /* engine thread is now running and waiting for work */
174 if (rf_engineDebug) {
175 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid);
176 }
177 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
178 if (rc) {
179 rf_print_unable_to_add_shutdown(__FILE__, __LINE__, rc);
180 rf_ShutdownEngine(NULL);
181 }
182 return (rc);
183 }
184
185 static int
186 BranchDone(RF_DagNode_t * node)
187 {
188 int i;
189
190 /* return true if forward execution is completed for a node and it's
191 * succedents */
192 switch (node->status) {
193 case rf_wait:
194 /* should never be called in this state */
195 RF_PANIC();
196 break;
197 case rf_fired:
198 /* node is currently executing, so we're not done */
199 return (RF_FALSE);
200 case rf_good:
201 /* for each succedent recursively check branch */
202 for (i = 0; i < node->numSuccedents; i++)
203 if (!BranchDone(node->succedents[i]))
204 return RF_FALSE;
205 return RF_TRUE; /* node and all succedent branches aren't in
206 * fired state */
207 case rf_bad:
208 /* succedents can't fire */
209 return (RF_TRUE);
210 case rf_recover:
211 /* should never be called in this state */
212 RF_PANIC();
213 break;
214 case rf_undone:
215 case rf_panic:
216 /* XXX need to fix this case */
217 /* for now, assume that we're done */
218 return (RF_TRUE);
219 default:
220 /* illegal node status */
221 RF_PANIC();
222 break;
223 }
224 }
225
226 static int
227 NodeReady(RF_DagNode_t * node)
228 {
229 int ready;
230
231 switch (node->dagHdr->status) {
232 case rf_enable:
233 case rf_rollForward:
234 if ((node->status == rf_wait) &&
235 (node->numAntecedents == node->numAntDone))
236 ready = RF_TRUE;
237 else
238 ready = RF_FALSE;
239 break;
240 case rf_rollBackward:
241 RF_ASSERT(node->numSuccDone <= node->numSuccedents);
242 RF_ASSERT(node->numSuccFired <= node->numSuccedents);
243 RF_ASSERT(node->numSuccFired <= node->numSuccDone);
244 if ((node->status == rf_good) &&
245 (node->numSuccDone == node->numSuccedents))
246 ready = RF_TRUE;
247 else
248 ready = RF_FALSE;
249 break;
250 default:
251 printf("Execution engine found illegal DAG status in NodeReady\n");
252 RF_PANIC();
253 break;
254 }
255
256 return (ready);
257 }
258
259
260
261 /* user context and dag-exec-thread context: Fire a node. The node's
262 * status field determines which function, do or undo, to be fired.
263 * This routine assumes that the node's status field has alread been
264 * set to "fired" or "recover" to indicate the direction of execution.
265 */
266 static void
267 FireNode(RF_DagNode_t * node)
268 {
269 switch (node->status) {
270 case rf_fired:
271 /* fire the do function of a node */
272 if (rf_engineDebug) {
273 printf("raid%d: Firing node 0x%lx (%s)\n",
274 node->dagHdr->raidPtr->raidid,
275 (unsigned long) node, node->name);
276 }
277 if (node->flags & RF_DAGNODE_FLAG_YIELD) {
278 #if defined(__NetBSD__) && defined(_KERNEL)
279 /* thread_block(); */
280 /* printf("Need to block the thread here...\n"); */
281 /* XXX thread_block is actually mentioned in
282 * /usr/include/vm/vm_extern.h */
283 #else
284 thread_block();
285 #endif
286 }
287 (*(node->doFunc)) (node);
288 break;
289 case rf_recover:
290 /* fire the undo function of a node */
291 if (rf_engineDebug) {
292 printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
293 node->dagHdr->raidPtr->raidid,
294 (unsigned long) node, node->name);
295 }
296 if (node->flags & RF_DAGNODE_FLAG_YIELD)
297 #if defined(__NetBSD__) && defined(_KERNEL)
298 /* thread_block(); */
299 /* printf("Need to block the thread here...\n"); */
300 /* XXX thread_block is actually mentioned in
301 * /usr/include/vm/vm_extern.h */
302 #else
303 thread_block();
304 #endif
305 (*(node->undoFunc)) (node);
306 break;
307 default:
308 RF_PANIC();
309 break;
310 }
311 }
312
313
314
315 /* user context:
316 * Attempt to fire each node in a linear array.
317 * The entire list is fired atomically.
318 */
319 static void
320 FireNodeArray(
321 int numNodes,
322 RF_DagNode_t ** nodeList)
323 {
324 RF_DagStatus_t dstat;
325 RF_DagNode_t *node;
326 int i, j;
327
328 /* first, mark all nodes which are ready to be fired */
329 for (i = 0; i < numNodes; i++) {
330 node = nodeList[i];
331 dstat = node->dagHdr->status;
332 RF_ASSERT((node->status == rf_wait) ||
333 (node->status == rf_good));
334 if (NodeReady(node)) {
335 if ((dstat == rf_enable) ||
336 (dstat == rf_rollForward)) {
337 RF_ASSERT(node->status == rf_wait);
338 if (node->commitNode)
339 node->dagHdr->numCommits++;
340 node->status = rf_fired;
341 for (j = 0; j < node->numAntecedents; j++)
342 node->antecedents[j]->numSuccFired++;
343 } else {
344 RF_ASSERT(dstat == rf_rollBackward);
345 RF_ASSERT(node->status == rf_good);
346 /* only one commit node per graph */
347 RF_ASSERT(node->commitNode == RF_FALSE);
348 node->status = rf_recover;
349 }
350 }
351 }
352 /* now, fire the nodes */
353 for (i = 0; i < numNodes; i++) {
354 if ((nodeList[i]->status == rf_fired) ||
355 (nodeList[i]->status == rf_recover))
356 FireNode(nodeList[i]);
357 }
358 }
359
360
361 /* user context:
362 * Attempt to fire each node in a linked list.
363 * The entire list is fired atomically.
364 */
365 static void
366 FireNodeList(RF_DagNode_t * nodeList)
367 {
368 RF_DagNode_t *node, *next;
369 RF_DagStatus_t dstat;
370 int j;
371
372 if (nodeList) {
373 /* first, mark all nodes which are ready to be fired */
374 for (node = nodeList; node; node = next) {
375 next = node->next;
376 dstat = node->dagHdr->status;
377 RF_ASSERT((node->status == rf_wait) ||
378 (node->status == rf_good));
379 if (NodeReady(node)) {
380 if ((dstat == rf_enable) ||
381 (dstat == rf_rollForward)) {
382 RF_ASSERT(node->status == rf_wait);
383 if (node->commitNode)
384 node->dagHdr->numCommits++;
385 node->status = rf_fired;
386 for (j = 0; j < node->numAntecedents; j++)
387 node->antecedents[j]->numSuccFired++;
388 } else {
389 RF_ASSERT(dstat == rf_rollBackward);
390 RF_ASSERT(node->status == rf_good);
391 /* only one commit node per graph */
392 RF_ASSERT(node->commitNode == RF_FALSE);
393 node->status = rf_recover;
394 }
395 }
396 }
397 /* now, fire the nodes */
398 for (node = nodeList; node; node = next) {
399 next = node->next;
400 if ((node->status == rf_fired) ||
401 (node->status == rf_recover))
402 FireNode(node);
403 }
404 }
405 }
406 /* interrupt context:
407 * for each succedent
408 * propagate required results from node to succedent
409 * increment succedent's numAntDone
410 * place newly-enable nodes on node queue for firing
411 *
412 * To save context switches, we don't place NIL nodes on the node queue,
413 * but rather just process them as if they had fired. Note that NIL nodes
414 * that are the direct successors of the header will actually get fired by
415 * DispatchDAG, which is fine because no context switches are involved.
416 *
417 * Important: when running at user level, this can be called by any
418 * disk thread, and so the increment and check of the antecedent count
419 * must be locked. I used the node queue mutex and locked down the
420 * entire function, but this is certainly overkill.
421 */
422 static void
423 PropagateResults(
424 RF_DagNode_t * node,
425 int context)
426 {
427 RF_DagNode_t *s, *a;
428 RF_Raid_t *raidPtr;
429 int i, ks;
430 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be
431 * finished */
432 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata
433 * antecedents */
434 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */
435 RF_DagNode_t *q = NULL, *qh = NULL, *next;
436 int j, skipNode;
437
438 raidPtr = node->dagHdr->raidPtr;
439
440 DO_LOCK(raidPtr);
441
442 /* debug - validate fire counts */
443 for (i = 0; i < node->numAntecedents; i++) {
444 a = *(node->antecedents + i);
445 RF_ASSERT(a->numSuccFired >= a->numSuccDone);
446 RF_ASSERT(a->numSuccFired <= a->numSuccedents);
447 a->numSuccDone++;
448 }
449
450 switch (node->dagHdr->status) {
451 case rf_enable:
452 case rf_rollForward:
453 for (i = 0; i < node->numSuccedents; i++) {
454 s = *(node->succedents + i);
455 RF_ASSERT(s->status == rf_wait);
456 (s->numAntDone)++;
457 if (s->numAntDone == s->numAntecedents) {
458 /* look for NIL nodes */
459 if (s->doFunc == rf_NullNodeFunc) {
460 /* don't fire NIL nodes, just process
461 * them */
462 s->next = finishlist;
463 finishlist = s;
464 } else {
465 /* look to see if the node is to be
466 * skipped */
467 skipNode = RF_FALSE;
468 for (j = 0; j < s->numAntecedents; j++)
469 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad))
470 skipNode = RF_TRUE;
471 if (skipNode) {
472 /* this node has one or more
473 * failed true data
474 * dependencies, so skip it */
475 s->next = skiplist;
476 skiplist = s;
477 } else
478 /* add s to list of nodes (q)
479 * to execute */
480 if (context != RF_INTR_CONTEXT) {
481 /* we only have to
482 * enqueue if we're at
483 * intr context */
484 /* put node on
485 a list to
486 be fired
487 after we
488 unlock */
489 s->next = firelist;
490 firelist = s;
491 } else {
492 /* enqueue the
493 node for
494 the dag
495 exec thread
496 to fire */
497 RF_ASSERT(NodeReady(s));
498 if (q) {
499 q->next = s;
500 q = s;
501 } else {
502 qh = q = s;
503 qh->next = NULL;
504 }
505 }
506 }
507 }
508 }
509
510 if (q) {
511 /* xfer our local list of nodes to the node queue */
512 q->next = raidPtr->node_queue;
513 raidPtr->node_queue = qh;
514 DO_SIGNAL(raidPtr);
515 }
516 DO_UNLOCK(raidPtr);
517
518 for (; skiplist; skiplist = next) {
519 next = skiplist->next;
520 skiplist->status = rf_skipped;
521 for (i = 0; i < skiplist->numAntecedents; i++) {
522 skiplist->antecedents[i]->numSuccFired++;
523 }
524 if (skiplist->commitNode) {
525 skiplist->dagHdr->numCommits++;
526 }
527 rf_FinishNode(skiplist, context);
528 }
529 for (; finishlist; finishlist = next) {
530 /* NIL nodes: no need to fire them */
531 next = finishlist->next;
532 finishlist->status = rf_good;
533 for (i = 0; i < finishlist->numAntecedents; i++) {
534 finishlist->antecedents[i]->numSuccFired++;
535 }
536 if (finishlist->commitNode)
537 finishlist->dagHdr->numCommits++;
538 /*
539 * Okay, here we're calling rf_FinishNode() on
540 * nodes that have the null function as their
541 * work proc. Such a node could be the
542 * terminal node in a DAG. If so, it will
543 * cause the DAG to complete, which will in
544 * turn free memory used by the DAG, which
545 * includes the node in question. Thus, we
546 * must avoid referencing the node at all
547 * after calling rf_FinishNode() on it. */
548 rf_FinishNode(finishlist, context); /* recursive call */
549 }
550 /* fire all nodes in firelist */
551 FireNodeList(firelist);
552 break;
553
554 case rf_rollBackward:
555 for (i = 0; i < node->numAntecedents; i++) {
556 a = *(node->antecedents + i);
557 RF_ASSERT(a->status == rf_good);
558 RF_ASSERT(a->numSuccDone <= a->numSuccedents);
559 RF_ASSERT(a->numSuccDone <= a->numSuccFired);
560
561 if (a->numSuccDone == a->numSuccFired) {
562 if (a->undoFunc == rf_NullNodeFunc) {
563 /* don't fire NIL nodes, just process
564 * them */
565 a->next = finishlist;
566 finishlist = a;
567 } else {
568 if (context != RF_INTR_CONTEXT) {
569 /* we only have to enqueue if
570 * we're at intr context */
571 /* put node on a list to be
572 fired after we unlock */
573 a->next = firelist;
574
575 firelist = a;
576 } else {
577 /* enqueue the node for the
578 dag exec thread to fire */
579 RF_ASSERT(NodeReady(a));
580 if (q) {
581 q->next = a;
582 q = a;
583 } else {
584 qh = q = a;
585 qh->next = NULL;
586 }
587 }
588 }
589 }
590 }
591 if (q) {
592 /* xfer our local list of nodes to the node queue */
593 q->next = raidPtr->node_queue;
594 raidPtr->node_queue = qh;
595 DO_SIGNAL(raidPtr);
596 }
597 DO_UNLOCK(raidPtr);
598 for (; finishlist; finishlist = next) {
599 /* NIL nodes: no need to fire them */
600 next = finishlist->next;
601 finishlist->status = rf_good;
602 /*
603 * Okay, here we're calling rf_FinishNode() on
604 * nodes that have the null function as their
605 * work proc. Such a node could be the first
606 * node in a DAG. If so, it will cause the DAG
607 * to complete, which will in turn free memory
608 * used by the DAG, which includes the node in
609 * question. Thus, we must avoid referencing
610 * the node at all after calling
611 * rf_FinishNode() on it. */
612 rf_FinishNode(finishlist, context); /* recursive call */
613 }
614 /* fire all nodes in firelist */
615 FireNodeList(firelist);
616
617 break;
618 default:
619 printf("Engine found illegal DAG status in PropagateResults()\n");
620 RF_PANIC();
621 break;
622 }
623 }
624
625
626
627 /*
628 * Process a fired node which has completed
629 */
630 static void
631 ProcessNode(
632 RF_DagNode_t * node,
633 int context)
634 {
635 RF_Raid_t *raidPtr;
636
637 raidPtr = node->dagHdr->raidPtr;
638
639 switch (node->status) {
640 case rf_good:
641 /* normal case, don't need to do anything */
642 break;
643 case rf_bad:
644 if ((node->dagHdr->numCommits > 0) ||
645 (node->dagHdr->numCommitNodes == 0)) {
646 /* crossed commit barrier */
647 node->dagHdr->status = rf_rollForward;
648 if (rf_engineDebug || 1) {
649 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name);
650 }
651 } else {
652 /* never reached commit barrier */
653 node->dagHdr->status = rf_rollBackward;
654 if (rf_engineDebug || 1) {
655 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name);
656 }
657 }
658 break;
659 case rf_undone:
660 /* normal rollBackward case, don't need to do anything */
661 break;
662 case rf_panic:
663 /* an undo node failed!!! */
664 printf("UNDO of a node failed!!!/n");
665 break;
666 default:
667 printf("node finished execution with an illegal status!!!\n");
668 RF_PANIC();
669 break;
670 }
671
672 /* enqueue node's succedents (antecedents if rollBackward) for
673 * execution */
674 PropagateResults(node, context);
675 }
676
677
678
679 /* user context or dag-exec-thread context:
680 * This is the first step in post-processing a newly-completed node.
681 * This routine is called by each node execution function to mark the node
682 * as complete and fire off any successors that have been enabled.
683 */
684 int
685 rf_FinishNode(
686 RF_DagNode_t * node,
687 int context)
688 {
689 int retcode = RF_FALSE;
690 node->dagHdr->numNodesCompleted++;
691 ProcessNode(node, context);
692
693 return (retcode);
694 }
695
696
697 /* user context: submit dag for execution, return non-zero if we have
698 * to wait for completion. if and only if we return non-zero, we'll
699 * cause cbFunc to get invoked with cbArg when the DAG has completed.
700 *
701 * for now we always return 1. If the DAG does not cause any I/O,
702 * then the callback may get invoked before DispatchDAG returns.
703 * There's code in state 5 of ContinueRaidAccess to handle this.
704 *
705 * All we do here is fire the direct successors of the header node.
706 * The DAG execution thread does the rest of the dag processing. */
707 int
708 rf_DispatchDAG(
709 RF_DagHeader_t * dag,
710 void (*cbFunc) (void *),
711 void *cbArg)
712 {
713 RF_Raid_t *raidPtr;
714
715 raidPtr = dag->raidPtr;
716 if (dag->tracerec) {
717 RF_ETIMER_START(dag->tracerec->timer);
718 }
719 #if DEBUG
720 #if RF_DEBUG_VALIDATE_DAG
721 if (rf_engineDebug || rf_validateDAGDebug) {
722 if (rf_ValidateDAG(dag))
723 RF_PANIC();
724 }
725 #endif
726 #endif
727 if (rf_engineDebug) {
728 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
729 }
730 raidPtr->dags_in_flight++; /* debug only: blow off proper
731 * locking */
732 dag->cbFunc = cbFunc;
733 dag->cbArg = cbArg;
734 dag->numNodesCompleted = 0;
735 dag->status = rf_enable;
736 FireNodeArray(dag->numSuccedents, dag->succedents);
737 return (1);
738 }
739 /* dedicated kernel thread: the thread that handles all DAG node
740 * firing. To minimize locking and unlocking, we grab a copy of the
741 * entire node queue and then set the node queue to NULL before doing
742 * any firing of nodes. This way we only have to release the lock
743 * once. Of course, it's probably rare that there's more than one
744 * node in the queue at any one time, but it sometimes happens.
745 */
746
747 static void
748 DAGExecutionThread(RF_ThreadArg_t arg)
749 {
750 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
751 RF_Raid_t *raidPtr;
752 int ks;
753 int s;
754
755 raidPtr = (RF_Raid_t *) arg;
756
757 if (rf_engineDebug) {
758 printf("raid%d: Engine thread is running\n", raidPtr->raidid);
759 }
760
761 s = splbio();
762
763 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg);
764
765 DO_LOCK(raidPtr);
766 while (!raidPtr->shutdown_engine) {
767
768 while (raidPtr->node_queue != NULL) {
769 local_nq = raidPtr->node_queue;
770 fire_nq = NULL;
771 term_nq = NULL;
772 raidPtr->node_queue = NULL;
773 DO_UNLOCK(raidPtr);
774
775 /* first, strip out the terminal nodes */
776 while (local_nq) {
777 nd = local_nq;
778 local_nq = local_nq->next;
779 switch (nd->dagHdr->status) {
780 case rf_enable:
781 case rf_rollForward:
782 if (nd->numSuccedents == 0) {
783 /* end of the dag, add to
784 * callback list */
785 nd->next = term_nq;
786 term_nq = nd;
787 } else {
788 /* not the end, add to the
789 * fire queue */
790 nd->next = fire_nq;
791 fire_nq = nd;
792 }
793 break;
794 case rf_rollBackward:
795 if (nd->numAntecedents == 0) {
796 /* end of the dag, add to the
797 * callback list */
798 nd->next = term_nq;
799 term_nq = nd;
800 } else {
801 /* not the end, add to the
802 * fire queue */
803 nd->next = fire_nq;
804 fire_nq = nd;
805 }
806 break;
807 default:
808 RF_PANIC();
809 break;
810 }
811 }
812
813 /* execute callback of dags which have reached the
814 * terminal node */
815 while (term_nq) {
816 nd = term_nq;
817 term_nq = term_nq->next;
818 nd->next = NULL;
819 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
820 raidPtr->dags_in_flight--; /* debug only */
821 }
822
823 /* fire remaining nodes */
824 FireNodeList(fire_nq);
825
826 DO_LOCK(raidPtr);
827 }
828 while (!raidPtr->shutdown_engine &&
829 raidPtr->node_queue == NULL) {
830 DO_WAIT(raidPtr);
831 }
832 }
833 DO_UNLOCK(raidPtr);
834
835 RF_THREADGROUP_DONE(&raidPtr->engine_tg);
836
837 splx(s);
838 kthread_exit(0);
839 }
840
841 /*
842 * rf_RaidIOThread() -- When I/O to a component completes,
843 * KernelWakeupFunc() puts the completed request onto raidPtr->iodone
844 * TAILQ. This function looks after requests on that queue by calling
845 * rf_DiskIOComplete() for the request, and by calling any required
846 * CompleteFunc for the request.
847 */
848
849 static void
850 rf_RaidIOThread(RF_ThreadArg_t arg)
851 {
852 RF_Raid_t *raidPtr;
853 RF_DiskQueueData_t *req;
854 int s;
855
856 raidPtr = (RF_Raid_t *) arg;
857
858 s = splbio();
859 simple_lock(&(raidPtr->iodone_lock));
860
861 while (!raidPtr->shutdown_raidio) {
862 /* if there is nothing to do, then snooze. */
863 if (TAILQ_EMPTY(&(raidPtr->iodone))) {
864 ltsleep(&(raidPtr->iodone), PRIBIO, "raidiow", 0,
865 &(raidPtr->iodone_lock));
866 }
867
868 /* See what I/Os, if any, have arrived */
869 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) {
870 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries);
871 simple_unlock(&(raidPtr->iodone_lock));
872 rf_DiskIOComplete(req->queue, req, req->error);
873 (req->CompleteFunc) (req->argument, req->error);
874 simple_lock(&(raidPtr->iodone_lock));
875 }
876 }
877
878 /* Let rf_ShutdownEngine know that we're done... */
879 raidPtr->shutdown_raidio = 0;
880 wakeup(&(raidPtr->shutdown_raidio));
881
882 simple_unlock(&(raidPtr->iodone_lock));
883 splx(s);
884
885 kthread_exit(0);
886 }
887