rf_dagffwr.c revision 1.18 1 /* $NetBSD: rf_dagffwr.c,v 1.18 2004/02/21 20:06:29 oster Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /*
30 * rf_dagff.c
31 *
32 * code for creating fault-free DAGs
33 *
34 */
35
36 #include <sys/cdefs.h>
37 __KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.18 2004/02/21 20:06:29 oster Exp $");
38
39 #include <dev/raidframe/raidframevar.h>
40
41 #include "rf_raid.h"
42 #include "rf_dag.h"
43 #include "rf_dagutils.h"
44 #include "rf_dagfuncs.h"
45 #include "rf_debugMem.h"
46 #include "rf_dagffrd.h"
47 #include "rf_general.h"
48 #include "rf_dagffwr.h"
49
50 /******************************************************************************
51 *
52 * General comments on DAG creation:
53 *
54 * All DAGs in this file use roll-away error recovery. Each DAG has a single
55 * commit node, usually called "Cmt." If an error occurs before the Cmt node
56 * is reached, the execution engine will halt forward execution and work
57 * backward through the graph, executing the undo functions. Assuming that
58 * each node in the graph prior to the Cmt node are undoable and atomic - or -
59 * does not make changes to permanent state, the graph will fail atomically.
60 * If an error occurs after the Cmt node executes, the engine will roll-forward
61 * through the graph, blindly executing nodes until it reaches the end.
62 * If a graph reaches the end, it is assumed to have completed successfully.
63 *
64 * A graph has only 1 Cmt node.
65 *
66 */
67
68
69 /******************************************************************************
70 *
71 * The following wrappers map the standard DAG creation interface to the
72 * DAG creation routines. Additionally, these wrappers enable experimentation
73 * with new DAG structures by providing an extra level of indirection, allowing
74 * the DAG creation routines to be replaced at this single point.
75 */
76
77
78 void
79 rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
80 RF_DagHeader_t *dag_h, void *bp,
81 RF_RaidAccessFlags_t flags,
82 RF_AllocListElem_t *allocList,
83 RF_IoType_t type)
84 {
85 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
86 RF_IO_TYPE_WRITE);
87 }
88
89 void
90 rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
91 RF_DagHeader_t *dag_h, void *bp,
92 RF_RaidAccessFlags_t flags,
93 RF_AllocListElem_t *allocList,
94 RF_IoType_t type)
95 {
96 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
97 RF_IO_TYPE_WRITE);
98 }
99
100 void
101 rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
102 RF_DagHeader_t *dag_h, void *bp,
103 RF_RaidAccessFlags_t flags,
104 RF_AllocListElem_t *allocList)
105 {
106 /* "normal" rollaway */
107 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags,
108 allocList, &rf_xorFuncs, NULL);
109 }
110
111 void
112 rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
113 RF_DagHeader_t *dag_h, void *bp,
114 RF_RaidAccessFlags_t flags,
115 RF_AllocListElem_t *allocList)
116 {
117 /* "normal" rollaway */
118 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags,
119 allocList, 1, rf_RegularXorFunc, RF_TRUE);
120 }
121
122
123 /******************************************************************************
124 *
125 * DAG creation code begins here
126 */
127
128
129 /******************************************************************************
130 *
131 * creates a DAG to perform a large-write operation:
132 *
133 * / Rod \ / Wnd \
134 * H -- block- Rod - Xor - Cmt - Wnd --- T
135 * \ Rod / \ Wnp /
136 * \[Wnq]/
137 *
138 * The XOR node also does the Q calculation in the P+Q architecture.
139 * All nodes are before the commit node (Cmt) are assumed to be atomic and
140 * undoable - or - they make no changes to permanent state.
141 *
142 * Rod = read old data
143 * Cmt = commit node
144 * Wnp = write new parity
145 * Wnd = write new data
146 * Wnq = write new "q"
147 * [] denotes optional segments in the graph
148 *
149 * Parameters: raidPtr - description of the physical array
150 * asmap - logical & physical addresses for this access
151 * bp - buffer ptr (holds write data)
152 * flags - general flags (e.g. disk locking)
153 * allocList - list of memory allocated in DAG creation
154 * nfaults - number of faults array can tolerate
155 * (equal to # redundancy units in stripe)
156 * redfuncs - list of redundancy generating functions
157 *
158 *****************************************************************************/
159
160 void
161 rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
162 RF_DagHeader_t *dag_h, void *bp,
163 RF_RaidAccessFlags_t flags,
164 RF_AllocListElem_t *allocList,
165 int nfaults, int (*redFunc) (RF_DagNode_t *),
166 int allowBufferRecycle)
167 {
168 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
169 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
170 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
171 RF_AccessStripeMapHeader_t *new_asm_h[2];
172 RF_StripeNum_t parityStripeID;
173 char *sosBuffer, *eosBuffer;
174 RF_ReconUnitNum_t which_ru;
175 RF_RaidLayout_t *layoutPtr;
176 RF_PhysDiskAddr_t *pda;
177
178 layoutPtr = &(raidPtr->Layout);
179 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr,
180 asmap->raidAddress,
181 &which_ru);
182
183 if (rf_dagDebug) {
184 printf("[Creating large-write DAG]\n");
185 }
186 dag_h->creator = "LargeWriteDAG";
187
188 dag_h->numCommitNodes = 1;
189 dag_h->numCommits = 0;
190 dag_h->numSuccedents = 1;
191
192 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
193 nWndNodes = asmap->numStripeUnitsAccessed;
194 RF_MallocAndAdd(nodes,
195 (nWndNodes + 4 + nfaults) * sizeof(RF_DagNode_t),
196 (RF_DagNode_t *), allocList);
197 i = 0;
198 wndNodes = &nodes[i];
199 i += nWndNodes;
200 xorNode = &nodes[i];
201 i += 1;
202 wnpNode = &nodes[i];
203 i += 1;
204 blockNode = &nodes[i];
205 i += 1;
206 commitNode = &nodes[i];
207 i += 1;
208 termNode = &nodes[i];
209 i += 1;
210 if (nfaults == 2) {
211 wnqNode = &nodes[i];
212 i += 1;
213 } else {
214 wnqNode = NULL;
215 }
216 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
217 new_asm_h, &nRodNodes, &sosBuffer,
218 &eosBuffer, allocList);
219 if (nRodNodes > 0) {
220 RF_MallocAndAdd(rodNodes, nRodNodes * sizeof(RF_DagNode_t),
221 (RF_DagNode_t *), allocList);
222 } else {
223 rodNodes = NULL;
224 }
225
226 /* begin node initialization */
227 if (nRodNodes > 0) {
228 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
229 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0,
230 dag_h, "Nil", allocList);
231 } else {
232 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
233 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0,
234 dag_h, "Nil", allocList);
235 }
236
237 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
238 rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0,
239 dag_h, "Cmt", allocList);
240 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
241 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
242 dag_h, "Trm", allocList);
243
244 /* initialize the Rod nodes */
245 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
246 if (new_asm_h[asmNum]) {
247 pda = new_asm_h[asmNum]->stripeMap->physInfo;
248 while (pda) {
249 rf_InitNode(&rodNodes[nodeNum], rf_wait,
250 RF_FALSE, rf_DiskReadFunc,
251 rf_DiskReadUndoFunc,
252 rf_GenericWakeupFunc,
253 1, 1, 4, 0, dag_h,
254 "Rod", allocList);
255 rodNodes[nodeNum].params[0].p = pda;
256 rodNodes[nodeNum].params[1].p = pda->bufPtr;
257 rodNodes[nodeNum].params[2].v = parityStripeID;
258 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
259 which_ru);
260 nodeNum++;
261 pda = pda->next;
262 }
263 }
264 }
265 RF_ASSERT(nodeNum == nRodNodes);
266
267 /* initialize the wnd nodes */
268 pda = asmap->physInfo;
269 for (i = 0; i < nWndNodes; i++) {
270 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE,
271 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
272 rf_GenericWakeupFunc, 1, 1, 4, 0,
273 dag_h, "Wnd", allocList);
274 RF_ASSERT(pda != NULL);
275 wndNodes[i].params[0].p = pda;
276 wndNodes[i].params[1].p = pda->bufPtr;
277 wndNodes[i].params[2].v = parityStripeID;
278 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
279 pda = pda->next;
280 }
281
282 /* initialize the redundancy node */
283 if (nRodNodes > 0) {
284 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
285 rf_NullNodeUndoFunc, NULL, 1,
286 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1,
287 nfaults, dag_h, "Xr ", allocList);
288 } else {
289 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
290 rf_NullNodeUndoFunc, NULL, 1,
291 1, 2 * (nWndNodes + nRodNodes) + 1,
292 nfaults, dag_h, "Xr ", allocList);
293 }
294 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
295 for (i = 0; i < nWndNodes; i++) {
296 /* pda */
297 xorNode->params[2 * i + 0] = wndNodes[i].params[0];
298 /* buf ptr */
299 xorNode->params[2 * i + 1] = wndNodes[i].params[1];
300 }
301 for (i = 0; i < nRodNodes; i++) {
302 /* pda */
303 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0];
304 /* buf ptr */
305 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1];
306 }
307 /* xor node needs to get at RAID information */
308 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
309
310 /*
311 * Look for an Rod node that reads a complete SU. If none,
312 * alloc a buffer to receive the parity info. Note that we
313 * can't use a new data buffer because it will not have gotten
314 * written when the xor occurs. */
315 if (allowBufferRecycle) {
316 for (i = 0; i < nRodNodes; i++) {
317 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
318 break;
319 }
320 }
321 if ((!allowBufferRecycle) || (i == nRodNodes)) {
322 RF_MallocAndAdd(xorNode->results[0],
323 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
324 (void *), allocList);
325 } else {
326 xorNode->results[0] = rodNodes[i].params[1].p;
327 }
328
329 /* initialize the Wnp node */
330 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
331 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
332 dag_h, "Wnp", allocList);
333 wnpNode->params[0].p = asmap->parityInfo;
334 wnpNode->params[1].p = xorNode->results[0];
335 wnpNode->params[2].v = parityStripeID;
336 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
337 /* parityInfo must describe entire parity unit */
338 RF_ASSERT(asmap->parityInfo->next == NULL);
339
340 if (nfaults == 2) {
341 /*
342 * We never try to recycle a buffer for the Q calcuation
343 * in addition to the parity. This would cause two buffers
344 * to get smashed during the P and Q calculation, guaranteeing
345 * one would be wrong.
346 */
347 RF_MallocAndAdd(xorNode->results[1],
348 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
349 (void *), allocList);
350 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
351 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc,
352 1, 1, 4, 0, dag_h, "Wnq", allocList);
353 wnqNode->params[0].p = asmap->qInfo;
354 wnqNode->params[1].p = xorNode->results[1];
355 wnqNode->params[2].v = parityStripeID;
356 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
357 /* parityInfo must describe entire parity unit */
358 RF_ASSERT(asmap->parityInfo->next == NULL);
359 }
360 /*
361 * Connect nodes to form graph.
362 */
363
364 /* connect dag header to block node */
365 RF_ASSERT(blockNode->numAntecedents == 0);
366 dag_h->succedents[0] = blockNode;
367
368 if (nRodNodes > 0) {
369 /* connect the block node to the Rod nodes */
370 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
371 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
372 for (i = 0; i < nRodNodes; i++) {
373 RF_ASSERT(rodNodes[i].numAntecedents == 1);
374 blockNode->succedents[i] = &rodNodes[i];
375 rodNodes[i].antecedents[0] = blockNode;
376 rodNodes[i].antType[0] = rf_control;
377
378 /* connect the Rod nodes to the Xor node */
379 RF_ASSERT(rodNodes[i].numSuccedents == 1);
380 rodNodes[i].succedents[0] = xorNode;
381 xorNode->antecedents[i] = &rodNodes[i];
382 xorNode->antType[i] = rf_trueData;
383 }
384 } else {
385 /* connect the block node to the Xor node */
386 RF_ASSERT(blockNode->numSuccedents == 1);
387 RF_ASSERT(xorNode->numAntecedents == 1);
388 blockNode->succedents[0] = xorNode;
389 xorNode->antecedents[0] = blockNode;
390 xorNode->antType[0] = rf_control;
391 }
392
393 /* connect the xor node to the commit node */
394 RF_ASSERT(xorNode->numSuccedents == 1);
395 RF_ASSERT(commitNode->numAntecedents == 1);
396 xorNode->succedents[0] = commitNode;
397 commitNode->antecedents[0] = xorNode;
398 commitNode->antType[0] = rf_control;
399
400 /* connect the commit node to the write nodes */
401 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
402 for (i = 0; i < nWndNodes; i++) {
403 RF_ASSERT(wndNodes->numAntecedents == 1);
404 commitNode->succedents[i] = &wndNodes[i];
405 wndNodes[i].antecedents[0] = commitNode;
406 wndNodes[i].antType[0] = rf_control;
407 }
408 RF_ASSERT(wnpNode->numAntecedents == 1);
409 commitNode->succedents[nWndNodes] = wnpNode;
410 wnpNode->antecedents[0] = commitNode;
411 wnpNode->antType[0] = rf_trueData;
412 if (nfaults == 2) {
413 RF_ASSERT(wnqNode->numAntecedents == 1);
414 commitNode->succedents[nWndNodes + 1] = wnqNode;
415 wnqNode->antecedents[0] = commitNode;
416 wnqNode->antType[0] = rf_trueData;
417 }
418 /* connect the write nodes to the term node */
419 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
420 RF_ASSERT(termNode->numSuccedents == 0);
421 for (i = 0; i < nWndNodes; i++) {
422 RF_ASSERT(wndNodes->numSuccedents == 1);
423 wndNodes[i].succedents[0] = termNode;
424 termNode->antecedents[i] = &wndNodes[i];
425 termNode->antType[i] = rf_control;
426 }
427 RF_ASSERT(wnpNode->numSuccedents == 1);
428 wnpNode->succedents[0] = termNode;
429 termNode->antecedents[nWndNodes] = wnpNode;
430 termNode->antType[nWndNodes] = rf_control;
431 if (nfaults == 2) {
432 RF_ASSERT(wnqNode->numSuccedents == 1);
433 wnqNode->succedents[0] = termNode;
434 termNode->antecedents[nWndNodes + 1] = wnqNode;
435 termNode->antType[nWndNodes + 1] = rf_control;
436 }
437 }
438 /******************************************************************************
439 *
440 * creates a DAG to perform a small-write operation (either raid 5 or pq),
441 * which is as follows:
442 *
443 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
444 * \- Rod X / \----> Wnd [Und]-/
445 * [\- Rod X / \---> Wnd [Und]-/]
446 * [\- Roq -> Q / \--> Wnq [Unq]-/]
447 *
448 * Rop = read old parity
449 * Rod = read old data
450 * Roq = read old "q"
451 * Cmt = commit node
452 * Und = unlock data disk
453 * Unp = unlock parity disk
454 * Unq = unlock q disk
455 * Wnp = write new parity
456 * Wnd = write new data
457 * Wnq = write new "q"
458 * [ ] denotes optional segments in the graph
459 *
460 * Parameters: raidPtr - description of the physical array
461 * asmap - logical & physical addresses for this access
462 * bp - buffer ptr (holds write data)
463 * flags - general flags (e.g. disk locking)
464 * allocList - list of memory allocated in DAG creation
465 * pfuncs - list of parity generating functions
466 * qfuncs - list of q generating functions
467 *
468 * A null qfuncs indicates single fault tolerant
469 *****************************************************************************/
470
471 void
472 rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
473 RF_DagHeader_t *dag_h, void *bp,
474 RF_RaidAccessFlags_t flags,
475 RF_AllocListElem_t *allocList,
476 const RF_RedFuncs_t *pfuncs,
477 const RF_RedFuncs_t *qfuncs)
478 {
479 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
480 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
481 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
482 int i, j, nNodes, totalNumNodes;
483 RF_ReconUnitNum_t which_ru;
484 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
485 int (*qfunc) (RF_DagNode_t *);
486 int numDataNodes, numParityNodes;
487 RF_StripeNum_t parityStripeID;
488 RF_PhysDiskAddr_t *pda;
489 char *name, *qname;
490 long nfaults;
491
492 nfaults = qfuncs ? 2 : 1;
493
494 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
495 asmap->raidAddress, &which_ru);
496 pda = asmap->physInfo;
497 numDataNodes = asmap->numStripeUnitsAccessed;
498 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
499
500 if (rf_dagDebug) {
501 printf("[Creating small-write DAG]\n");
502 }
503 RF_ASSERT(numDataNodes > 0);
504 dag_h->creator = "SmallWriteDAG";
505
506 dag_h->numCommitNodes = 1;
507 dag_h->numCommits = 0;
508 dag_h->numSuccedents = 1;
509
510 /*
511 * DAG creation occurs in four steps:
512 * 1. count the number of nodes in the DAG
513 * 2. create the nodes
514 * 3. initialize the nodes
515 * 4. connect the nodes
516 */
517
518 /*
519 * Step 1. compute number of nodes in the graph
520 */
521
522 /* number of nodes: a read and write for each data unit a
523 * redundancy computation node for each parity node (nfaults *
524 * nparity) a read and write for each parity unit a block and
525 * commit node (2) a terminate node if atomic RMW an unlock
526 * node for each data unit, redundancy unit */
527 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
528 + (nfaults * 2 * numParityNodes) + 3;
529 /*
530 * Step 2. create the nodes
531 */
532 RF_MallocAndAdd(nodes, totalNumNodes * sizeof(RF_DagNode_t),
533 (RF_DagNode_t *), allocList);
534 i = 0;
535 blockNode = &nodes[i];
536 i += 1;
537 commitNode = &nodes[i];
538 i += 1;
539 readDataNodes = &nodes[i];
540 i += numDataNodes;
541 readParityNodes = &nodes[i];
542 i += numParityNodes;
543 writeDataNodes = &nodes[i];
544 i += numDataNodes;
545 writeParityNodes = &nodes[i];
546 i += numParityNodes;
547 xorNodes = &nodes[i];
548 i += numParityNodes;
549 termNode = &nodes[i];
550 i += 1;
551
552 if (nfaults == 2) {
553 readQNodes = &nodes[i];
554 i += numParityNodes;
555 writeQNodes = &nodes[i];
556 i += numParityNodes;
557 qNodes = &nodes[i];
558 i += numParityNodes;
559 } else {
560 readQNodes = writeQNodes = qNodes = NULL;
561 }
562 RF_ASSERT(i == totalNumNodes);
563
564 /*
565 * Step 3. initialize the nodes
566 */
567 /* initialize block node (Nil) */
568 nNodes = numDataNodes + (nfaults * numParityNodes);
569 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
570 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0,
571 dag_h, "Nil", allocList);
572
573 /* initialize commit node (Cmt) */
574 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
575 rf_NullNodeUndoFunc, NULL, nNodes,
576 (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
577
578 /* initialize terminate node (Trm) */
579 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
580 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0,
581 dag_h, "Trm", allocList);
582
583 /* initialize nodes which read old data (Rod) */
584 for (i = 0; i < numDataNodes; i++) {
585 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
586 rf_DiskReadFunc, rf_DiskReadUndoFunc,
587 rf_GenericWakeupFunc, (nfaults * numParityNodes),
588 1, 4, 0, dag_h, "Rod", allocList);
589 RF_ASSERT(pda != NULL);
590 /* physical disk addr desc */
591 readDataNodes[i].params[0].p = pda;
592 /* buffer to hold old data */
593 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
594 dag_h, pda, allocList);
595 readDataNodes[i].params[2].v = parityStripeID;
596 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
597 which_ru);
598 pda = pda->next;
599 for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
600 readDataNodes[i].propList[j] = NULL;
601 }
602 }
603
604 /* initialize nodes which read old parity (Rop) */
605 pda = asmap->parityInfo;
606 i = 0;
607 for (i = 0; i < numParityNodes; i++) {
608 RF_ASSERT(pda != NULL);
609 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
610 rf_DiskReadFunc, rf_DiskReadUndoFunc,
611 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0,
612 dag_h, "Rop", allocList);
613 readParityNodes[i].params[0].p = pda;
614 /* buffer to hold old parity */
615 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
616 dag_h, pda, allocList);
617 readParityNodes[i].params[2].v = parityStripeID;
618 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
619 which_ru);
620 pda = pda->next;
621 for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
622 readParityNodes[i].propList[0] = NULL;
623 }
624 }
625
626 /* initialize nodes which read old Q (Roq) */
627 if (nfaults == 2) {
628 pda = asmap->qInfo;
629 for (i = 0; i < numParityNodes; i++) {
630 RF_ASSERT(pda != NULL);
631 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
632 rf_DiskReadFunc, rf_DiskReadUndoFunc,
633 rf_GenericWakeupFunc, numParityNodes,
634 1, 4, 0, dag_h, "Roq", allocList);
635 readQNodes[i].params[0].p = pda;
636 /* buffer to hold old Q */
637 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
638 dag_h, pda,
639 allocList);
640 readQNodes[i].params[2].v = parityStripeID;
641 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
642 which_ru);
643 pda = pda->next;
644 for (j = 0; j < readQNodes[i].numSuccedents; j++) {
645 readQNodes[i].propList[0] = NULL;
646 }
647 }
648 }
649 /* initialize nodes which write new data (Wnd) */
650 pda = asmap->physInfo;
651 for (i = 0; i < numDataNodes; i++) {
652 RF_ASSERT(pda != NULL);
653 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
654 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
655 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
656 "Wnd", allocList);
657 /* physical disk addr desc */
658 writeDataNodes[i].params[0].p = pda;
659 /* buffer holding new data to be written */
660 writeDataNodes[i].params[1].p = pda->bufPtr;
661 writeDataNodes[i].params[2].v = parityStripeID;
662 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
663 which_ru);
664 pda = pda->next;
665 }
666
667 /*
668 * Initialize nodes which compute new parity and Q.
669 */
670 /*
671 * We use the simple XOR func in the double-XOR case, and when
672 * we're accessing only a portion of one stripe unit. The
673 * distinction between the two is that the regular XOR func
674 * assumes that the targbuf is a full SU in size, and examines
675 * the pda associated with the buffer to decide where within
676 * the buffer to XOR the data, whereas the simple XOR func
677 * just XORs the data into the start of the buffer. */
678 if ((numParityNodes == 2) || ((numDataNodes == 1)
679 && (asmap->totalSectorsAccessed <
680 raidPtr->Layout.sectorsPerStripeUnit))) {
681 func = pfuncs->simple;
682 undoFunc = rf_NullNodeUndoFunc;
683 name = pfuncs->SimpleName;
684 if (qfuncs) {
685 qfunc = qfuncs->simple;
686 qname = qfuncs->SimpleName;
687 } else {
688 qfunc = NULL;
689 qname = NULL;
690 }
691 } else {
692 func = pfuncs->regular;
693 undoFunc = rf_NullNodeUndoFunc;
694 name = pfuncs->RegularName;
695 if (qfuncs) {
696 qfunc = qfuncs->regular;
697 qname = qfuncs->RegularName;
698 } else {
699 qfunc = NULL;
700 qname = NULL;
701 }
702 }
703 /*
704 * Initialize the xor nodes: params are {pda,buf}
705 * from {Rod,Wnd,Rop} nodes, and raidPtr
706 */
707 if (numParityNodes == 2) {
708 /* double-xor case */
709 for (i = 0; i < numParityNodes; i++) {
710 /* note: no wakeup func for xor */
711 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
712 undoFunc, NULL, 1,
713 (numDataNodes + numParityNodes),
714 7, 1, dag_h, name, allocList);
715 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
716 xorNodes[i].params[0] = readDataNodes[i].params[0];
717 xorNodes[i].params[1] = readDataNodes[i].params[1];
718 xorNodes[i].params[2] = readParityNodes[i].params[0];
719 xorNodes[i].params[3] = readParityNodes[i].params[1];
720 xorNodes[i].params[4] = writeDataNodes[i].params[0];
721 xorNodes[i].params[5] = writeDataNodes[i].params[1];
722 xorNodes[i].params[6].p = raidPtr;
723 /* use old parity buf as target buf */
724 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
725 if (nfaults == 2) {
726 /* note: no wakeup func for qor */
727 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
728 qfunc, undoFunc, NULL, 1,
729 (numDataNodes + numParityNodes),
730 7, 1, dag_h, qname, allocList);
731 qNodes[i].params[0] = readDataNodes[i].params[0];
732 qNodes[i].params[1] = readDataNodes[i].params[1];
733 qNodes[i].params[2] = readQNodes[i].params[0];
734 qNodes[i].params[3] = readQNodes[i].params[1];
735 qNodes[i].params[4] = writeDataNodes[i].params[0];
736 qNodes[i].params[5] = writeDataNodes[i].params[1];
737 qNodes[i].params[6].p = raidPtr;
738 /* use old Q buf as target buf */
739 qNodes[i].results[0] = readQNodes[i].params[1].p;
740 }
741 }
742 } else {
743 /* there is only one xor node in this case */
744 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func,
745 undoFunc, NULL, 1, (numDataNodes + numParityNodes),
746 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
747 dag_h, name, allocList);
748 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
749 for (i = 0; i < numDataNodes + 1; i++) {
750 /* set up params related to Rod and Rop nodes */
751 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
752 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
753 }
754 for (i = 0; i < numDataNodes; i++) {
755 /* set up params related to Wnd and Wnp nodes */
756 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
757 writeDataNodes[i].params[0];
758 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
759 writeDataNodes[i].params[1];
760 }
761 /* xor node needs to get at RAID information */
762 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
763 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
764 if (nfaults == 2) {
765 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
766 undoFunc, NULL, 1,
767 (numDataNodes + numParityNodes),
768 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
769 dag_h, qname, allocList);
770 for (i = 0; i < numDataNodes; i++) {
771 /* set up params related to Rod */
772 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
773 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
774 }
775 /* and read old q */
776 qNodes[0].params[2 * numDataNodes + 0] = /* pda */
777 readQNodes[0].params[0];
778 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */
779 readQNodes[0].params[1];
780 for (i = 0; i < numDataNodes; i++) {
781 /* set up params related to Wnd nodes */
782 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
783 writeDataNodes[i].params[0];
784 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
785 writeDataNodes[i].params[1];
786 }
787 /* xor node needs to get at RAID information */
788 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
789 qNodes[0].results[0] = readQNodes[0].params[1].p;
790 }
791 }
792
793 /* initialize nodes which write new parity (Wnp) */
794 pda = asmap->parityInfo;
795 for (i = 0; i < numParityNodes; i++) {
796 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
797 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
798 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
799 "Wnp", allocList);
800 RF_ASSERT(pda != NULL);
801 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
802 * filled in by xor node */
803 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
804 * parity write
805 * operation */
806 writeParityNodes[i].params[2].v = parityStripeID;
807 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
808 which_ru);
809 pda = pda->next;
810 }
811
812 /* initialize nodes which write new Q (Wnq) */
813 if (nfaults == 2) {
814 pda = asmap->qInfo;
815 for (i = 0; i < numParityNodes; i++) {
816 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
817 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
818 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
819 "Wnq", allocList);
820 RF_ASSERT(pda != NULL);
821 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
822 * filled in by xor node */
823 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
824 * parity write
825 * operation */
826 writeQNodes[i].params[2].v = parityStripeID;
827 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
828 which_ru);
829 pda = pda->next;
830 }
831 }
832 /*
833 * Step 4. connect the nodes.
834 */
835
836 /* connect header to block node */
837 dag_h->succedents[0] = blockNode;
838
839 /* connect block node to read old data nodes */
840 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
841 for (i = 0; i < numDataNodes; i++) {
842 blockNode->succedents[i] = &readDataNodes[i];
843 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
844 readDataNodes[i].antecedents[0] = blockNode;
845 readDataNodes[i].antType[0] = rf_control;
846 }
847
848 /* connect block node to read old parity nodes */
849 for (i = 0; i < numParityNodes; i++) {
850 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
851 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
852 readParityNodes[i].antecedents[0] = blockNode;
853 readParityNodes[i].antType[0] = rf_control;
854 }
855
856 /* connect block node to read old Q nodes */
857 if (nfaults == 2) {
858 for (i = 0; i < numParityNodes; i++) {
859 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
860 RF_ASSERT(readQNodes[i].numAntecedents == 1);
861 readQNodes[i].antecedents[0] = blockNode;
862 readQNodes[i].antType[0] = rf_control;
863 }
864 }
865 /* connect read old data nodes to xor nodes */
866 for (i = 0; i < numDataNodes; i++) {
867 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
868 for (j = 0; j < numParityNodes; j++) {
869 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
870 readDataNodes[i].succedents[j] = &xorNodes[j];
871 xorNodes[j].antecedents[i] = &readDataNodes[i];
872 xorNodes[j].antType[i] = rf_trueData;
873 }
874 }
875
876 /* connect read old data nodes to q nodes */
877 if (nfaults == 2) {
878 for (i = 0; i < numDataNodes; i++) {
879 for (j = 0; j < numParityNodes; j++) {
880 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
881 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
882 qNodes[j].antecedents[i] = &readDataNodes[i];
883 qNodes[j].antType[i] = rf_trueData;
884 }
885 }
886 }
887 /* connect read old parity nodes to xor nodes */
888 for (i = 0; i < numParityNodes; i++) {
889 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
890 for (j = 0; j < numParityNodes; j++) {
891 readParityNodes[i].succedents[j] = &xorNodes[j];
892 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
893 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
894 }
895 }
896
897 /* connect read old q nodes to q nodes */
898 if (nfaults == 2) {
899 for (i = 0; i < numParityNodes; i++) {
900 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
901 for (j = 0; j < numParityNodes; j++) {
902 readQNodes[i].succedents[j] = &qNodes[j];
903 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
904 qNodes[j].antType[numDataNodes + i] = rf_trueData;
905 }
906 }
907 }
908 /* connect xor nodes to commit node */
909 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
910 for (i = 0; i < numParityNodes; i++) {
911 RF_ASSERT(xorNodes[i].numSuccedents == 1);
912 xorNodes[i].succedents[0] = commitNode;
913 commitNode->antecedents[i] = &xorNodes[i];
914 commitNode->antType[i] = rf_control;
915 }
916
917 /* connect q nodes to commit node */
918 if (nfaults == 2) {
919 for (i = 0; i < numParityNodes; i++) {
920 RF_ASSERT(qNodes[i].numSuccedents == 1);
921 qNodes[i].succedents[0] = commitNode;
922 commitNode->antecedents[i + numParityNodes] = &qNodes[i];
923 commitNode->antType[i + numParityNodes] = rf_control;
924 }
925 }
926 /* connect commit node to write nodes */
927 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
928 for (i = 0; i < numDataNodes; i++) {
929 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
930 commitNode->succedents[i] = &writeDataNodes[i];
931 writeDataNodes[i].antecedents[0] = commitNode;
932 writeDataNodes[i].antType[0] = rf_trueData;
933 }
934 for (i = 0; i < numParityNodes; i++) {
935 RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
936 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
937 writeParityNodes[i].antecedents[0] = commitNode;
938 writeParityNodes[i].antType[0] = rf_trueData;
939 }
940 if (nfaults == 2) {
941 for (i = 0; i < numParityNodes; i++) {
942 RF_ASSERT(writeQNodes[i].numAntecedents == 1);
943 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
944 writeQNodes[i].antecedents[0] = commitNode;
945 writeQNodes[i].antType[0] = rf_trueData;
946 }
947 }
948 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
949 RF_ASSERT(termNode->numSuccedents == 0);
950 for (i = 0; i < numDataNodes; i++) {
951 /* connect write new data nodes to term node */
952 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
953 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
954 writeDataNodes[i].succedents[0] = termNode;
955 termNode->antecedents[i] = &writeDataNodes[i];
956 termNode->antType[i] = rf_control;
957 }
958
959 for (i = 0; i < numParityNodes; i++) {
960 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
961 writeParityNodes[i].succedents[0] = termNode;
962 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
963 termNode->antType[numDataNodes + i] = rf_control;
964 }
965
966 if (nfaults == 2) {
967 for (i = 0; i < numParityNodes; i++) {
968 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
969 writeQNodes[i].succedents[0] = termNode;
970 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
971 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
972 }
973 }
974 }
975
976
977 /******************************************************************************
978 * create a write graph (fault-free or degraded) for RAID level 1
979 *
980 * Hdr -> Commit -> Wpd -> Nil -> Trm
981 * -> Wsd ->
982 *
983 * The "Wpd" node writes data to the primary copy in the mirror pair
984 * The "Wsd" node writes data to the secondary copy in the mirror pair
985 *
986 * Parameters: raidPtr - description of the physical array
987 * asmap - logical & physical addresses for this access
988 * bp - buffer ptr (holds write data)
989 * flags - general flags (e.g. disk locking)
990 * allocList - list of memory allocated in DAG creation
991 *****************************************************************************/
992
993 void
994 rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
995 RF_DagHeader_t *dag_h, void *bp,
996 RF_RaidAccessFlags_t flags,
997 RF_AllocListElem_t *allocList)
998 {
999 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1000 RF_DagNode_t *nodes, *wndNode, *wmirNode;
1001 int nWndNodes, nWmirNodes, i;
1002 RF_ReconUnitNum_t which_ru;
1003 RF_PhysDiskAddr_t *pda, *pdaP;
1004 RF_StripeNum_t parityStripeID;
1005
1006 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1007 asmap->raidAddress, &which_ru);
1008 if (rf_dagDebug) {
1009 printf("[Creating RAID level 1 write DAG]\n");
1010 }
1011 dag_h->creator = "RaidOneWriteDAG";
1012
1013 /* 2 implies access not SU aligned */
1014 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1015 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1016
1017 /* alloc the Wnd nodes and the Wmir node */
1018 if (asmap->numDataFailed == 1)
1019 nWndNodes--;
1020 if (asmap->numParityFailed == 1)
1021 nWmirNodes--;
1022
1023 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock
1024 * + terminator) */
1025 RF_MallocAndAdd(nodes,
1026 (nWndNodes + nWmirNodes + 3) * sizeof(RF_DagNode_t),
1027 (RF_DagNode_t *), allocList);
1028 i = 0;
1029 wndNode = &nodes[i];
1030 i += nWndNodes;
1031 wmirNode = &nodes[i];
1032 i += nWmirNodes;
1033 commitNode = &nodes[i];
1034 i += 1;
1035 unblockNode = &nodes[i];
1036 i += 1;
1037 termNode = &nodes[i];
1038 i += 1;
1039 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1040
1041 /* this dag can commit immediately */
1042 dag_h->numCommitNodes = 1;
1043 dag_h->numCommits = 0;
1044 dag_h->numSuccedents = 1;
1045
1046 /* initialize the commit, unblock, and term nodes */
1047 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
1048 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes),
1049 0, 0, 0, dag_h, "Cmt", allocList);
1050 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1051 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes),
1052 0, 0, dag_h, "Nil", allocList);
1053 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1054 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0,
1055 dag_h, "Trm", allocList);
1056
1057 /* initialize the wnd nodes */
1058 if (nWndNodes > 0) {
1059 pda = asmap->physInfo;
1060 for (i = 0; i < nWndNodes; i++) {
1061 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
1062 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1063 rf_GenericWakeupFunc, 1, 1, 4, 0,
1064 dag_h, "Wpd", allocList);
1065 RF_ASSERT(pda != NULL);
1066 wndNode[i].params[0].p = pda;
1067 wndNode[i].params[1].p = pda->bufPtr;
1068 wndNode[i].params[2].v = parityStripeID;
1069 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
1070 pda = pda->next;
1071 }
1072 RF_ASSERT(pda == NULL);
1073 }
1074 /* initialize the mirror nodes */
1075 if (nWmirNodes > 0) {
1076 pda = asmap->physInfo;
1077 pdaP = asmap->parityInfo;
1078 for (i = 0; i < nWmirNodes; i++) {
1079 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
1080 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1081 rf_GenericWakeupFunc, 1, 1, 4, 0,
1082 dag_h, "Wsd", allocList);
1083 RF_ASSERT(pda != NULL);
1084 wmirNode[i].params[0].p = pdaP;
1085 wmirNode[i].params[1].p = pda->bufPtr;
1086 wmirNode[i].params[2].v = parityStripeID;
1087 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
1088 pda = pda->next;
1089 pdaP = pdaP->next;
1090 }
1091 RF_ASSERT(pda == NULL);
1092 RF_ASSERT(pdaP == NULL);
1093 }
1094 /* link the header node to the commit node */
1095 RF_ASSERT(dag_h->numSuccedents == 1);
1096 RF_ASSERT(commitNode->numAntecedents == 0);
1097 dag_h->succedents[0] = commitNode;
1098
1099 /* link the commit node to the write nodes */
1100 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1101 for (i = 0; i < nWndNodes; i++) {
1102 RF_ASSERT(wndNode[i].numAntecedents == 1);
1103 commitNode->succedents[i] = &wndNode[i];
1104 wndNode[i].antecedents[0] = commitNode;
1105 wndNode[i].antType[0] = rf_control;
1106 }
1107 for (i = 0; i < nWmirNodes; i++) {
1108 RF_ASSERT(wmirNode[i].numAntecedents == 1);
1109 commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1110 wmirNode[i].antecedents[0] = commitNode;
1111 wmirNode[i].antType[0] = rf_control;
1112 }
1113
1114 /* link the write nodes to the unblock node */
1115 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1116 for (i = 0; i < nWndNodes; i++) {
1117 RF_ASSERT(wndNode[i].numSuccedents == 1);
1118 wndNode[i].succedents[0] = unblockNode;
1119 unblockNode->antecedents[i] = &wndNode[i];
1120 unblockNode->antType[i] = rf_control;
1121 }
1122 for (i = 0; i < nWmirNodes; i++) {
1123 RF_ASSERT(wmirNode[i].numSuccedents == 1);
1124 wmirNode[i].succedents[0] = unblockNode;
1125 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1126 unblockNode->antType[i + nWndNodes] = rf_control;
1127 }
1128
1129 /* link the unblock node to the term node */
1130 RF_ASSERT(unblockNode->numSuccedents == 1);
1131 RF_ASSERT(termNode->numAntecedents == 1);
1132 RF_ASSERT(termNode->numSuccedents == 0);
1133 unblockNode->succedents[0] = termNode;
1134 termNode->antecedents[0] = unblockNode;
1135 termNode->antType[0] = rf_control;
1136 }
1137