rf_dagffwr.c revision 1.19 1 /* $NetBSD: rf_dagffwr.c,v 1.19 2004/03/05 03:22:05 oster Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /*
30 * rf_dagff.c
31 *
32 * code for creating fault-free DAGs
33 *
34 */
35
36 #include <sys/cdefs.h>
37 __KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.19 2004/03/05 03:22:05 oster Exp $");
38
39 #include <dev/raidframe/raidframevar.h>
40
41 #include "rf_raid.h"
42 #include "rf_dag.h"
43 #include "rf_dagutils.h"
44 #include "rf_dagfuncs.h"
45 #include "rf_debugMem.h"
46 #include "rf_dagffrd.h"
47 #include "rf_general.h"
48 #include "rf_dagffwr.h"
49
50 /******************************************************************************
51 *
52 * General comments on DAG creation:
53 *
54 * All DAGs in this file use roll-away error recovery. Each DAG has a single
55 * commit node, usually called "Cmt." If an error occurs before the Cmt node
56 * is reached, the execution engine will halt forward execution and work
57 * backward through the graph, executing the undo functions. Assuming that
58 * each node in the graph prior to the Cmt node are undoable and atomic - or -
59 * does not make changes to permanent state, the graph will fail atomically.
60 * If an error occurs after the Cmt node executes, the engine will roll-forward
61 * through the graph, blindly executing nodes until it reaches the end.
62 * If a graph reaches the end, it is assumed to have completed successfully.
63 *
64 * A graph has only 1 Cmt node.
65 *
66 */
67
68
69 /******************************************************************************
70 *
71 * The following wrappers map the standard DAG creation interface to the
72 * DAG creation routines. Additionally, these wrappers enable experimentation
73 * with new DAG structures by providing an extra level of indirection, allowing
74 * the DAG creation routines to be replaced at this single point.
75 */
76
77
78 void
79 rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
80 RF_DagHeader_t *dag_h, void *bp,
81 RF_RaidAccessFlags_t flags,
82 RF_AllocListElem_t *allocList,
83 RF_IoType_t type)
84 {
85 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
86 RF_IO_TYPE_WRITE);
87 }
88
89 void
90 rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
91 RF_DagHeader_t *dag_h, void *bp,
92 RF_RaidAccessFlags_t flags,
93 RF_AllocListElem_t *allocList,
94 RF_IoType_t type)
95 {
96 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
97 RF_IO_TYPE_WRITE);
98 }
99
100 void
101 rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
102 RF_DagHeader_t *dag_h, void *bp,
103 RF_RaidAccessFlags_t flags,
104 RF_AllocListElem_t *allocList)
105 {
106 /* "normal" rollaway */
107 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags,
108 allocList, &rf_xorFuncs, NULL);
109 }
110
111 void
112 rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
113 RF_DagHeader_t *dag_h, void *bp,
114 RF_RaidAccessFlags_t flags,
115 RF_AllocListElem_t *allocList)
116 {
117 /* "normal" rollaway */
118 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags,
119 allocList, 1, rf_RegularXorFunc, RF_TRUE);
120 }
121
122
123 /******************************************************************************
124 *
125 * DAG creation code begins here
126 */
127
128
129 /******************************************************************************
130 *
131 * creates a DAG to perform a large-write operation:
132 *
133 * / Rod \ / Wnd \
134 * H -- block- Rod - Xor - Cmt - Wnd --- T
135 * \ Rod / \ Wnp /
136 * \[Wnq]/
137 *
138 * The XOR node also does the Q calculation in the P+Q architecture.
139 * All nodes are before the commit node (Cmt) are assumed to be atomic and
140 * undoable - or - they make no changes to permanent state.
141 *
142 * Rod = read old data
143 * Cmt = commit node
144 * Wnp = write new parity
145 * Wnd = write new data
146 * Wnq = write new "q"
147 * [] denotes optional segments in the graph
148 *
149 * Parameters: raidPtr - description of the physical array
150 * asmap - logical & physical addresses for this access
151 * bp - buffer ptr (holds write data)
152 * flags - general flags (e.g. disk locking)
153 * allocList - list of memory allocated in DAG creation
154 * nfaults - number of faults array can tolerate
155 * (equal to # redundancy units in stripe)
156 * redfuncs - list of redundancy generating functions
157 *
158 *****************************************************************************/
159
160 void
161 rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
162 RF_DagHeader_t *dag_h, void *bp,
163 RF_RaidAccessFlags_t flags,
164 RF_AllocListElem_t *allocList,
165 int nfaults, int (*redFunc) (RF_DagNode_t *),
166 int allowBufferRecycle)
167 {
168 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
169 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
170 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
171 RF_AccessStripeMapHeader_t *new_asm_h[2];
172 RF_StripeNum_t parityStripeID;
173 char *sosBuffer, *eosBuffer;
174 RF_ReconUnitNum_t which_ru;
175 RF_RaidLayout_t *layoutPtr;
176 RF_PhysDiskAddr_t *pda;
177
178 layoutPtr = &(raidPtr->Layout);
179 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr,
180 asmap->raidAddress,
181 &which_ru);
182
183 #if RF_DEBUG_DAG
184 if (rf_dagDebug) {
185 printf("[Creating large-write DAG]\n");
186 }
187 #endif
188 dag_h->creator = "LargeWriteDAG";
189
190 dag_h->numCommitNodes = 1;
191 dag_h->numCommits = 0;
192 dag_h->numSuccedents = 1;
193
194 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
195 nWndNodes = asmap->numStripeUnitsAccessed;
196 RF_MallocAndAdd(nodes,
197 (nWndNodes + 4 + nfaults) * sizeof(RF_DagNode_t),
198 (RF_DagNode_t *), allocList);
199 i = 0;
200 wndNodes = &nodes[i];
201 i += nWndNodes;
202 xorNode = &nodes[i];
203 i += 1;
204 wnpNode = &nodes[i];
205 i += 1;
206 blockNode = &nodes[i];
207 i += 1;
208 commitNode = &nodes[i];
209 i += 1;
210 termNode = &nodes[i];
211 i += 1;
212 if (nfaults == 2) {
213 wnqNode = &nodes[i];
214 i += 1;
215 } else {
216 wnqNode = NULL;
217 }
218 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
219 new_asm_h, &nRodNodes, &sosBuffer,
220 &eosBuffer, allocList);
221 if (nRodNodes > 0) {
222 RF_MallocAndAdd(rodNodes, nRodNodes * sizeof(RF_DagNode_t),
223 (RF_DagNode_t *), allocList);
224 } else {
225 rodNodes = NULL;
226 }
227
228 /* begin node initialization */
229 if (nRodNodes > 0) {
230 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
231 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0,
232 dag_h, "Nil", allocList);
233 } else {
234 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
235 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0,
236 dag_h, "Nil", allocList);
237 }
238
239 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
240 rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0,
241 dag_h, "Cmt", allocList);
242 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
243 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
244 dag_h, "Trm", allocList);
245
246 /* initialize the Rod nodes */
247 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
248 if (new_asm_h[asmNum]) {
249 pda = new_asm_h[asmNum]->stripeMap->physInfo;
250 while (pda) {
251 rf_InitNode(&rodNodes[nodeNum], rf_wait,
252 RF_FALSE, rf_DiskReadFunc,
253 rf_DiskReadUndoFunc,
254 rf_GenericWakeupFunc,
255 1, 1, 4, 0, dag_h,
256 "Rod", allocList);
257 rodNodes[nodeNum].params[0].p = pda;
258 rodNodes[nodeNum].params[1].p = pda->bufPtr;
259 rodNodes[nodeNum].params[2].v = parityStripeID;
260 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
261 which_ru);
262 nodeNum++;
263 pda = pda->next;
264 }
265 }
266 }
267 RF_ASSERT(nodeNum == nRodNodes);
268
269 /* initialize the wnd nodes */
270 pda = asmap->physInfo;
271 for (i = 0; i < nWndNodes; i++) {
272 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE,
273 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
274 rf_GenericWakeupFunc, 1, 1, 4, 0,
275 dag_h, "Wnd", allocList);
276 RF_ASSERT(pda != NULL);
277 wndNodes[i].params[0].p = pda;
278 wndNodes[i].params[1].p = pda->bufPtr;
279 wndNodes[i].params[2].v = parityStripeID;
280 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
281 pda = pda->next;
282 }
283
284 /* initialize the redundancy node */
285 if (nRodNodes > 0) {
286 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
287 rf_NullNodeUndoFunc, NULL, 1,
288 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1,
289 nfaults, dag_h, "Xr ", allocList);
290 } else {
291 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
292 rf_NullNodeUndoFunc, NULL, 1,
293 1, 2 * (nWndNodes + nRodNodes) + 1,
294 nfaults, dag_h, "Xr ", allocList);
295 }
296 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
297 for (i = 0; i < nWndNodes; i++) {
298 /* pda */
299 xorNode->params[2 * i + 0] = wndNodes[i].params[0];
300 /* buf ptr */
301 xorNode->params[2 * i + 1] = wndNodes[i].params[1];
302 }
303 for (i = 0; i < nRodNodes; i++) {
304 /* pda */
305 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0];
306 /* buf ptr */
307 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1];
308 }
309 /* xor node needs to get at RAID information */
310 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
311
312 /*
313 * Look for an Rod node that reads a complete SU. If none,
314 * alloc a buffer to receive the parity info. Note that we
315 * can't use a new data buffer because it will not have gotten
316 * written when the xor occurs. */
317 if (allowBufferRecycle) {
318 for (i = 0; i < nRodNodes; i++) {
319 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
320 break;
321 }
322 }
323 if ((!allowBufferRecycle) || (i == nRodNodes)) {
324 RF_MallocAndAdd(xorNode->results[0],
325 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
326 (void *), allocList);
327 } else {
328 xorNode->results[0] = rodNodes[i].params[1].p;
329 }
330
331 /* initialize the Wnp node */
332 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
333 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
334 dag_h, "Wnp", allocList);
335 wnpNode->params[0].p = asmap->parityInfo;
336 wnpNode->params[1].p = xorNode->results[0];
337 wnpNode->params[2].v = parityStripeID;
338 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
339 /* parityInfo must describe entire parity unit */
340 RF_ASSERT(asmap->parityInfo->next == NULL);
341
342 if (nfaults == 2) {
343 /*
344 * We never try to recycle a buffer for the Q calcuation
345 * in addition to the parity. This would cause two buffers
346 * to get smashed during the P and Q calculation, guaranteeing
347 * one would be wrong.
348 */
349 RF_MallocAndAdd(xorNode->results[1],
350 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
351 (void *), allocList);
352 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
353 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc,
354 1, 1, 4, 0, dag_h, "Wnq", allocList);
355 wnqNode->params[0].p = asmap->qInfo;
356 wnqNode->params[1].p = xorNode->results[1];
357 wnqNode->params[2].v = parityStripeID;
358 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
359 /* parityInfo must describe entire parity unit */
360 RF_ASSERT(asmap->parityInfo->next == NULL);
361 }
362 /*
363 * Connect nodes to form graph.
364 */
365
366 /* connect dag header to block node */
367 RF_ASSERT(blockNode->numAntecedents == 0);
368 dag_h->succedents[0] = blockNode;
369
370 if (nRodNodes > 0) {
371 /* connect the block node to the Rod nodes */
372 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
373 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
374 for (i = 0; i < nRodNodes; i++) {
375 RF_ASSERT(rodNodes[i].numAntecedents == 1);
376 blockNode->succedents[i] = &rodNodes[i];
377 rodNodes[i].antecedents[0] = blockNode;
378 rodNodes[i].antType[0] = rf_control;
379
380 /* connect the Rod nodes to the Xor node */
381 RF_ASSERT(rodNodes[i].numSuccedents == 1);
382 rodNodes[i].succedents[0] = xorNode;
383 xorNode->antecedents[i] = &rodNodes[i];
384 xorNode->antType[i] = rf_trueData;
385 }
386 } else {
387 /* connect the block node to the Xor node */
388 RF_ASSERT(blockNode->numSuccedents == 1);
389 RF_ASSERT(xorNode->numAntecedents == 1);
390 blockNode->succedents[0] = xorNode;
391 xorNode->antecedents[0] = blockNode;
392 xorNode->antType[0] = rf_control;
393 }
394
395 /* connect the xor node to the commit node */
396 RF_ASSERT(xorNode->numSuccedents == 1);
397 RF_ASSERT(commitNode->numAntecedents == 1);
398 xorNode->succedents[0] = commitNode;
399 commitNode->antecedents[0] = xorNode;
400 commitNode->antType[0] = rf_control;
401
402 /* connect the commit node to the write nodes */
403 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
404 for (i = 0; i < nWndNodes; i++) {
405 RF_ASSERT(wndNodes->numAntecedents == 1);
406 commitNode->succedents[i] = &wndNodes[i];
407 wndNodes[i].antecedents[0] = commitNode;
408 wndNodes[i].antType[0] = rf_control;
409 }
410 RF_ASSERT(wnpNode->numAntecedents == 1);
411 commitNode->succedents[nWndNodes] = wnpNode;
412 wnpNode->antecedents[0] = commitNode;
413 wnpNode->antType[0] = rf_trueData;
414 if (nfaults == 2) {
415 RF_ASSERT(wnqNode->numAntecedents == 1);
416 commitNode->succedents[nWndNodes + 1] = wnqNode;
417 wnqNode->antecedents[0] = commitNode;
418 wnqNode->antType[0] = rf_trueData;
419 }
420 /* connect the write nodes to the term node */
421 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
422 RF_ASSERT(termNode->numSuccedents == 0);
423 for (i = 0; i < nWndNodes; i++) {
424 RF_ASSERT(wndNodes->numSuccedents == 1);
425 wndNodes[i].succedents[0] = termNode;
426 termNode->antecedents[i] = &wndNodes[i];
427 termNode->antType[i] = rf_control;
428 }
429 RF_ASSERT(wnpNode->numSuccedents == 1);
430 wnpNode->succedents[0] = termNode;
431 termNode->antecedents[nWndNodes] = wnpNode;
432 termNode->antType[nWndNodes] = rf_control;
433 if (nfaults == 2) {
434 RF_ASSERT(wnqNode->numSuccedents == 1);
435 wnqNode->succedents[0] = termNode;
436 termNode->antecedents[nWndNodes + 1] = wnqNode;
437 termNode->antType[nWndNodes + 1] = rf_control;
438 }
439 }
440 /******************************************************************************
441 *
442 * creates a DAG to perform a small-write operation (either raid 5 or pq),
443 * which is as follows:
444 *
445 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
446 * \- Rod X / \----> Wnd [Und]-/
447 * [\- Rod X / \---> Wnd [Und]-/]
448 * [\- Roq -> Q / \--> Wnq [Unq]-/]
449 *
450 * Rop = read old parity
451 * Rod = read old data
452 * Roq = read old "q"
453 * Cmt = commit node
454 * Und = unlock data disk
455 * Unp = unlock parity disk
456 * Unq = unlock q disk
457 * Wnp = write new parity
458 * Wnd = write new data
459 * Wnq = write new "q"
460 * [ ] denotes optional segments in the graph
461 *
462 * Parameters: raidPtr - description of the physical array
463 * asmap - logical & physical addresses for this access
464 * bp - buffer ptr (holds write data)
465 * flags - general flags (e.g. disk locking)
466 * allocList - list of memory allocated in DAG creation
467 * pfuncs - list of parity generating functions
468 * qfuncs - list of q generating functions
469 *
470 * A null qfuncs indicates single fault tolerant
471 *****************************************************************************/
472
473 void
474 rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
475 RF_DagHeader_t *dag_h, void *bp,
476 RF_RaidAccessFlags_t flags,
477 RF_AllocListElem_t *allocList,
478 const RF_RedFuncs_t *pfuncs,
479 const RF_RedFuncs_t *qfuncs)
480 {
481 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
482 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
483 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
484 int i, j, nNodes, totalNumNodes;
485 RF_ReconUnitNum_t which_ru;
486 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
487 int (*qfunc) (RF_DagNode_t *);
488 int numDataNodes, numParityNodes;
489 RF_StripeNum_t parityStripeID;
490 RF_PhysDiskAddr_t *pda;
491 char *name, *qname;
492 long nfaults;
493
494 nfaults = qfuncs ? 2 : 1;
495
496 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
497 asmap->raidAddress, &which_ru);
498 pda = asmap->physInfo;
499 numDataNodes = asmap->numStripeUnitsAccessed;
500 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
501
502 #if RF_DEBUG_DAG
503 if (rf_dagDebug) {
504 printf("[Creating small-write DAG]\n");
505 }
506 #endif
507 RF_ASSERT(numDataNodes > 0);
508 dag_h->creator = "SmallWriteDAG";
509
510 dag_h->numCommitNodes = 1;
511 dag_h->numCommits = 0;
512 dag_h->numSuccedents = 1;
513
514 /*
515 * DAG creation occurs in four steps:
516 * 1. count the number of nodes in the DAG
517 * 2. create the nodes
518 * 3. initialize the nodes
519 * 4. connect the nodes
520 */
521
522 /*
523 * Step 1. compute number of nodes in the graph
524 */
525
526 /* number of nodes: a read and write for each data unit a
527 * redundancy computation node for each parity node (nfaults *
528 * nparity) a read and write for each parity unit a block and
529 * commit node (2) a terminate node if atomic RMW an unlock
530 * node for each data unit, redundancy unit */
531 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
532 + (nfaults * 2 * numParityNodes) + 3;
533 /*
534 * Step 2. create the nodes
535 */
536 RF_MallocAndAdd(nodes, totalNumNodes * sizeof(RF_DagNode_t),
537 (RF_DagNode_t *), allocList);
538 i = 0;
539 blockNode = &nodes[i];
540 i += 1;
541 commitNode = &nodes[i];
542 i += 1;
543 readDataNodes = &nodes[i];
544 i += numDataNodes;
545 readParityNodes = &nodes[i];
546 i += numParityNodes;
547 writeDataNodes = &nodes[i];
548 i += numDataNodes;
549 writeParityNodes = &nodes[i];
550 i += numParityNodes;
551 xorNodes = &nodes[i];
552 i += numParityNodes;
553 termNode = &nodes[i];
554 i += 1;
555
556 if (nfaults == 2) {
557 readQNodes = &nodes[i];
558 i += numParityNodes;
559 writeQNodes = &nodes[i];
560 i += numParityNodes;
561 qNodes = &nodes[i];
562 i += numParityNodes;
563 } else {
564 readQNodes = writeQNodes = qNodes = NULL;
565 }
566 RF_ASSERT(i == totalNumNodes);
567
568 /*
569 * Step 3. initialize the nodes
570 */
571 /* initialize block node (Nil) */
572 nNodes = numDataNodes + (nfaults * numParityNodes);
573 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
574 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0,
575 dag_h, "Nil", allocList);
576
577 /* initialize commit node (Cmt) */
578 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
579 rf_NullNodeUndoFunc, NULL, nNodes,
580 (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
581
582 /* initialize terminate node (Trm) */
583 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
584 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0,
585 dag_h, "Trm", allocList);
586
587 /* initialize nodes which read old data (Rod) */
588 for (i = 0; i < numDataNodes; i++) {
589 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
590 rf_DiskReadFunc, rf_DiskReadUndoFunc,
591 rf_GenericWakeupFunc, (nfaults * numParityNodes),
592 1, 4, 0, dag_h, "Rod", allocList);
593 RF_ASSERT(pda != NULL);
594 /* physical disk addr desc */
595 readDataNodes[i].params[0].p = pda;
596 /* buffer to hold old data */
597 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
598 dag_h, pda, allocList);
599 readDataNodes[i].params[2].v = parityStripeID;
600 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
601 which_ru);
602 pda = pda->next;
603 for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
604 readDataNodes[i].propList[j] = NULL;
605 }
606 }
607
608 /* initialize nodes which read old parity (Rop) */
609 pda = asmap->parityInfo;
610 i = 0;
611 for (i = 0; i < numParityNodes; i++) {
612 RF_ASSERT(pda != NULL);
613 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
614 rf_DiskReadFunc, rf_DiskReadUndoFunc,
615 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0,
616 dag_h, "Rop", allocList);
617 readParityNodes[i].params[0].p = pda;
618 /* buffer to hold old parity */
619 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
620 dag_h, pda, allocList);
621 readParityNodes[i].params[2].v = parityStripeID;
622 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
623 which_ru);
624 pda = pda->next;
625 for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
626 readParityNodes[i].propList[0] = NULL;
627 }
628 }
629
630 /* initialize nodes which read old Q (Roq) */
631 if (nfaults == 2) {
632 pda = asmap->qInfo;
633 for (i = 0; i < numParityNodes; i++) {
634 RF_ASSERT(pda != NULL);
635 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
636 rf_DiskReadFunc, rf_DiskReadUndoFunc,
637 rf_GenericWakeupFunc, numParityNodes,
638 1, 4, 0, dag_h, "Roq", allocList);
639 readQNodes[i].params[0].p = pda;
640 /* buffer to hold old Q */
641 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
642 dag_h, pda,
643 allocList);
644 readQNodes[i].params[2].v = parityStripeID;
645 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
646 which_ru);
647 pda = pda->next;
648 for (j = 0; j < readQNodes[i].numSuccedents; j++) {
649 readQNodes[i].propList[0] = NULL;
650 }
651 }
652 }
653 /* initialize nodes which write new data (Wnd) */
654 pda = asmap->physInfo;
655 for (i = 0; i < numDataNodes; i++) {
656 RF_ASSERT(pda != NULL);
657 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
658 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
659 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
660 "Wnd", allocList);
661 /* physical disk addr desc */
662 writeDataNodes[i].params[0].p = pda;
663 /* buffer holding new data to be written */
664 writeDataNodes[i].params[1].p = pda->bufPtr;
665 writeDataNodes[i].params[2].v = parityStripeID;
666 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
667 which_ru);
668 pda = pda->next;
669 }
670
671 /*
672 * Initialize nodes which compute new parity and Q.
673 */
674 /*
675 * We use the simple XOR func in the double-XOR case, and when
676 * we're accessing only a portion of one stripe unit. The
677 * distinction between the two is that the regular XOR func
678 * assumes that the targbuf is a full SU in size, and examines
679 * the pda associated with the buffer to decide where within
680 * the buffer to XOR the data, whereas the simple XOR func
681 * just XORs the data into the start of the buffer. */
682 if ((numParityNodes == 2) || ((numDataNodes == 1)
683 && (asmap->totalSectorsAccessed <
684 raidPtr->Layout.sectorsPerStripeUnit))) {
685 func = pfuncs->simple;
686 undoFunc = rf_NullNodeUndoFunc;
687 name = pfuncs->SimpleName;
688 if (qfuncs) {
689 qfunc = qfuncs->simple;
690 qname = qfuncs->SimpleName;
691 } else {
692 qfunc = NULL;
693 qname = NULL;
694 }
695 } else {
696 func = pfuncs->regular;
697 undoFunc = rf_NullNodeUndoFunc;
698 name = pfuncs->RegularName;
699 if (qfuncs) {
700 qfunc = qfuncs->regular;
701 qname = qfuncs->RegularName;
702 } else {
703 qfunc = NULL;
704 qname = NULL;
705 }
706 }
707 /*
708 * Initialize the xor nodes: params are {pda,buf}
709 * from {Rod,Wnd,Rop} nodes, and raidPtr
710 */
711 if (numParityNodes == 2) {
712 /* double-xor case */
713 for (i = 0; i < numParityNodes; i++) {
714 /* note: no wakeup func for xor */
715 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
716 undoFunc, NULL, 1,
717 (numDataNodes + numParityNodes),
718 7, 1, dag_h, name, allocList);
719 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
720 xorNodes[i].params[0] = readDataNodes[i].params[0];
721 xorNodes[i].params[1] = readDataNodes[i].params[1];
722 xorNodes[i].params[2] = readParityNodes[i].params[0];
723 xorNodes[i].params[3] = readParityNodes[i].params[1];
724 xorNodes[i].params[4] = writeDataNodes[i].params[0];
725 xorNodes[i].params[5] = writeDataNodes[i].params[1];
726 xorNodes[i].params[6].p = raidPtr;
727 /* use old parity buf as target buf */
728 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
729 if (nfaults == 2) {
730 /* note: no wakeup func for qor */
731 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
732 qfunc, undoFunc, NULL, 1,
733 (numDataNodes + numParityNodes),
734 7, 1, dag_h, qname, allocList);
735 qNodes[i].params[0] = readDataNodes[i].params[0];
736 qNodes[i].params[1] = readDataNodes[i].params[1];
737 qNodes[i].params[2] = readQNodes[i].params[0];
738 qNodes[i].params[3] = readQNodes[i].params[1];
739 qNodes[i].params[4] = writeDataNodes[i].params[0];
740 qNodes[i].params[5] = writeDataNodes[i].params[1];
741 qNodes[i].params[6].p = raidPtr;
742 /* use old Q buf as target buf */
743 qNodes[i].results[0] = readQNodes[i].params[1].p;
744 }
745 }
746 } else {
747 /* there is only one xor node in this case */
748 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func,
749 undoFunc, NULL, 1, (numDataNodes + numParityNodes),
750 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
751 dag_h, name, allocList);
752 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
753 for (i = 0; i < numDataNodes + 1; i++) {
754 /* set up params related to Rod and Rop nodes */
755 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
756 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
757 }
758 for (i = 0; i < numDataNodes; i++) {
759 /* set up params related to Wnd and Wnp nodes */
760 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
761 writeDataNodes[i].params[0];
762 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
763 writeDataNodes[i].params[1];
764 }
765 /* xor node needs to get at RAID information */
766 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
767 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
768 if (nfaults == 2) {
769 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
770 undoFunc, NULL, 1,
771 (numDataNodes + numParityNodes),
772 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
773 dag_h, qname, allocList);
774 for (i = 0; i < numDataNodes; i++) {
775 /* set up params related to Rod */
776 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
777 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
778 }
779 /* and read old q */
780 qNodes[0].params[2 * numDataNodes + 0] = /* pda */
781 readQNodes[0].params[0];
782 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */
783 readQNodes[0].params[1];
784 for (i = 0; i < numDataNodes; i++) {
785 /* set up params related to Wnd nodes */
786 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
787 writeDataNodes[i].params[0];
788 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
789 writeDataNodes[i].params[1];
790 }
791 /* xor node needs to get at RAID information */
792 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
793 qNodes[0].results[0] = readQNodes[0].params[1].p;
794 }
795 }
796
797 /* initialize nodes which write new parity (Wnp) */
798 pda = asmap->parityInfo;
799 for (i = 0; i < numParityNodes; i++) {
800 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
801 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
802 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
803 "Wnp", allocList);
804 RF_ASSERT(pda != NULL);
805 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
806 * filled in by xor node */
807 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
808 * parity write
809 * operation */
810 writeParityNodes[i].params[2].v = parityStripeID;
811 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
812 which_ru);
813 pda = pda->next;
814 }
815
816 /* initialize nodes which write new Q (Wnq) */
817 if (nfaults == 2) {
818 pda = asmap->qInfo;
819 for (i = 0; i < numParityNodes; i++) {
820 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
821 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
822 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
823 "Wnq", allocList);
824 RF_ASSERT(pda != NULL);
825 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
826 * filled in by xor node */
827 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
828 * parity write
829 * operation */
830 writeQNodes[i].params[2].v = parityStripeID;
831 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
832 which_ru);
833 pda = pda->next;
834 }
835 }
836 /*
837 * Step 4. connect the nodes.
838 */
839
840 /* connect header to block node */
841 dag_h->succedents[0] = blockNode;
842
843 /* connect block node to read old data nodes */
844 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
845 for (i = 0; i < numDataNodes; i++) {
846 blockNode->succedents[i] = &readDataNodes[i];
847 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
848 readDataNodes[i].antecedents[0] = blockNode;
849 readDataNodes[i].antType[0] = rf_control;
850 }
851
852 /* connect block node to read old parity nodes */
853 for (i = 0; i < numParityNodes; i++) {
854 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
855 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
856 readParityNodes[i].antecedents[0] = blockNode;
857 readParityNodes[i].antType[0] = rf_control;
858 }
859
860 /* connect block node to read old Q nodes */
861 if (nfaults == 2) {
862 for (i = 0; i < numParityNodes; i++) {
863 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
864 RF_ASSERT(readQNodes[i].numAntecedents == 1);
865 readQNodes[i].antecedents[0] = blockNode;
866 readQNodes[i].antType[0] = rf_control;
867 }
868 }
869 /* connect read old data nodes to xor nodes */
870 for (i = 0; i < numDataNodes; i++) {
871 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
872 for (j = 0; j < numParityNodes; j++) {
873 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
874 readDataNodes[i].succedents[j] = &xorNodes[j];
875 xorNodes[j].antecedents[i] = &readDataNodes[i];
876 xorNodes[j].antType[i] = rf_trueData;
877 }
878 }
879
880 /* connect read old data nodes to q nodes */
881 if (nfaults == 2) {
882 for (i = 0; i < numDataNodes; i++) {
883 for (j = 0; j < numParityNodes; j++) {
884 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
885 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
886 qNodes[j].antecedents[i] = &readDataNodes[i];
887 qNodes[j].antType[i] = rf_trueData;
888 }
889 }
890 }
891 /* connect read old parity nodes to xor nodes */
892 for (i = 0; i < numParityNodes; i++) {
893 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
894 for (j = 0; j < numParityNodes; j++) {
895 readParityNodes[i].succedents[j] = &xorNodes[j];
896 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
897 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
898 }
899 }
900
901 /* connect read old q nodes to q nodes */
902 if (nfaults == 2) {
903 for (i = 0; i < numParityNodes; i++) {
904 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
905 for (j = 0; j < numParityNodes; j++) {
906 readQNodes[i].succedents[j] = &qNodes[j];
907 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
908 qNodes[j].antType[numDataNodes + i] = rf_trueData;
909 }
910 }
911 }
912 /* connect xor nodes to commit node */
913 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
914 for (i = 0; i < numParityNodes; i++) {
915 RF_ASSERT(xorNodes[i].numSuccedents == 1);
916 xorNodes[i].succedents[0] = commitNode;
917 commitNode->antecedents[i] = &xorNodes[i];
918 commitNode->antType[i] = rf_control;
919 }
920
921 /* connect q nodes to commit node */
922 if (nfaults == 2) {
923 for (i = 0; i < numParityNodes; i++) {
924 RF_ASSERT(qNodes[i].numSuccedents == 1);
925 qNodes[i].succedents[0] = commitNode;
926 commitNode->antecedents[i + numParityNodes] = &qNodes[i];
927 commitNode->antType[i + numParityNodes] = rf_control;
928 }
929 }
930 /* connect commit node to write nodes */
931 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
932 for (i = 0; i < numDataNodes; i++) {
933 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
934 commitNode->succedents[i] = &writeDataNodes[i];
935 writeDataNodes[i].antecedents[0] = commitNode;
936 writeDataNodes[i].antType[0] = rf_trueData;
937 }
938 for (i = 0; i < numParityNodes; i++) {
939 RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
940 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
941 writeParityNodes[i].antecedents[0] = commitNode;
942 writeParityNodes[i].antType[0] = rf_trueData;
943 }
944 if (nfaults == 2) {
945 for (i = 0; i < numParityNodes; i++) {
946 RF_ASSERT(writeQNodes[i].numAntecedents == 1);
947 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
948 writeQNodes[i].antecedents[0] = commitNode;
949 writeQNodes[i].antType[0] = rf_trueData;
950 }
951 }
952 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
953 RF_ASSERT(termNode->numSuccedents == 0);
954 for (i = 0; i < numDataNodes; i++) {
955 /* connect write new data nodes to term node */
956 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
957 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
958 writeDataNodes[i].succedents[0] = termNode;
959 termNode->antecedents[i] = &writeDataNodes[i];
960 termNode->antType[i] = rf_control;
961 }
962
963 for (i = 0; i < numParityNodes; i++) {
964 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
965 writeParityNodes[i].succedents[0] = termNode;
966 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
967 termNode->antType[numDataNodes + i] = rf_control;
968 }
969
970 if (nfaults == 2) {
971 for (i = 0; i < numParityNodes; i++) {
972 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
973 writeQNodes[i].succedents[0] = termNode;
974 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
975 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
976 }
977 }
978 }
979
980
981 /******************************************************************************
982 * create a write graph (fault-free or degraded) for RAID level 1
983 *
984 * Hdr -> Commit -> Wpd -> Nil -> Trm
985 * -> Wsd ->
986 *
987 * The "Wpd" node writes data to the primary copy in the mirror pair
988 * The "Wsd" node writes data to the secondary copy in the mirror pair
989 *
990 * Parameters: raidPtr - description of the physical array
991 * asmap - logical & physical addresses for this access
992 * bp - buffer ptr (holds write data)
993 * flags - general flags (e.g. disk locking)
994 * allocList - list of memory allocated in DAG creation
995 *****************************************************************************/
996
997 void
998 rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
999 RF_DagHeader_t *dag_h, void *bp,
1000 RF_RaidAccessFlags_t flags,
1001 RF_AllocListElem_t *allocList)
1002 {
1003 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1004 RF_DagNode_t *nodes, *wndNode, *wmirNode;
1005 int nWndNodes, nWmirNodes, i;
1006 RF_ReconUnitNum_t which_ru;
1007 RF_PhysDiskAddr_t *pda, *pdaP;
1008 RF_StripeNum_t parityStripeID;
1009
1010 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1011 asmap->raidAddress, &which_ru);
1012 #if RF_DEBUG_DAG
1013 if (rf_dagDebug) {
1014 printf("[Creating RAID level 1 write DAG]\n");
1015 }
1016 #endif
1017 dag_h->creator = "RaidOneWriteDAG";
1018
1019 /* 2 implies access not SU aligned */
1020 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1021 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1022
1023 /* alloc the Wnd nodes and the Wmir node */
1024 if (asmap->numDataFailed == 1)
1025 nWndNodes--;
1026 if (asmap->numParityFailed == 1)
1027 nWmirNodes--;
1028
1029 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock
1030 * + terminator) */
1031 RF_MallocAndAdd(nodes,
1032 (nWndNodes + nWmirNodes + 3) * sizeof(RF_DagNode_t),
1033 (RF_DagNode_t *), allocList);
1034 i = 0;
1035 wndNode = &nodes[i];
1036 i += nWndNodes;
1037 wmirNode = &nodes[i];
1038 i += nWmirNodes;
1039 commitNode = &nodes[i];
1040 i += 1;
1041 unblockNode = &nodes[i];
1042 i += 1;
1043 termNode = &nodes[i];
1044 i += 1;
1045 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1046
1047 /* this dag can commit immediately */
1048 dag_h->numCommitNodes = 1;
1049 dag_h->numCommits = 0;
1050 dag_h->numSuccedents = 1;
1051
1052 /* initialize the commit, unblock, and term nodes */
1053 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
1054 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes),
1055 0, 0, 0, dag_h, "Cmt", allocList);
1056 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1057 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes),
1058 0, 0, dag_h, "Nil", allocList);
1059 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1060 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0,
1061 dag_h, "Trm", allocList);
1062
1063 /* initialize the wnd nodes */
1064 if (nWndNodes > 0) {
1065 pda = asmap->physInfo;
1066 for (i = 0; i < nWndNodes; i++) {
1067 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
1068 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1069 rf_GenericWakeupFunc, 1, 1, 4, 0,
1070 dag_h, "Wpd", allocList);
1071 RF_ASSERT(pda != NULL);
1072 wndNode[i].params[0].p = pda;
1073 wndNode[i].params[1].p = pda->bufPtr;
1074 wndNode[i].params[2].v = parityStripeID;
1075 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
1076 pda = pda->next;
1077 }
1078 RF_ASSERT(pda == NULL);
1079 }
1080 /* initialize the mirror nodes */
1081 if (nWmirNodes > 0) {
1082 pda = asmap->physInfo;
1083 pdaP = asmap->parityInfo;
1084 for (i = 0; i < nWmirNodes; i++) {
1085 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
1086 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1087 rf_GenericWakeupFunc, 1, 1, 4, 0,
1088 dag_h, "Wsd", allocList);
1089 RF_ASSERT(pda != NULL);
1090 wmirNode[i].params[0].p = pdaP;
1091 wmirNode[i].params[1].p = pda->bufPtr;
1092 wmirNode[i].params[2].v = parityStripeID;
1093 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
1094 pda = pda->next;
1095 pdaP = pdaP->next;
1096 }
1097 RF_ASSERT(pda == NULL);
1098 RF_ASSERT(pdaP == NULL);
1099 }
1100 /* link the header node to the commit node */
1101 RF_ASSERT(dag_h->numSuccedents == 1);
1102 RF_ASSERT(commitNode->numAntecedents == 0);
1103 dag_h->succedents[0] = commitNode;
1104
1105 /* link the commit node to the write nodes */
1106 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1107 for (i = 0; i < nWndNodes; i++) {
1108 RF_ASSERT(wndNode[i].numAntecedents == 1);
1109 commitNode->succedents[i] = &wndNode[i];
1110 wndNode[i].antecedents[0] = commitNode;
1111 wndNode[i].antType[0] = rf_control;
1112 }
1113 for (i = 0; i < nWmirNodes; i++) {
1114 RF_ASSERT(wmirNode[i].numAntecedents == 1);
1115 commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1116 wmirNode[i].antecedents[0] = commitNode;
1117 wmirNode[i].antType[0] = rf_control;
1118 }
1119
1120 /* link the write nodes to the unblock node */
1121 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1122 for (i = 0; i < nWndNodes; i++) {
1123 RF_ASSERT(wndNode[i].numSuccedents == 1);
1124 wndNode[i].succedents[0] = unblockNode;
1125 unblockNode->antecedents[i] = &wndNode[i];
1126 unblockNode->antType[i] = rf_control;
1127 }
1128 for (i = 0; i < nWmirNodes; i++) {
1129 RF_ASSERT(wmirNode[i].numSuccedents == 1);
1130 wmirNode[i].succedents[0] = unblockNode;
1131 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1132 unblockNode->antType[i + nWndNodes] = rf_control;
1133 }
1134
1135 /* link the unblock node to the term node */
1136 RF_ASSERT(unblockNode->numSuccedents == 1);
1137 RF_ASSERT(termNode->numAntecedents == 1);
1138 RF_ASSERT(termNode->numSuccedents == 0);
1139 unblockNode->succedents[0] = termNode;
1140 termNode->antecedents[0] = unblockNode;
1141 termNode->antType[0] = rf_control;
1142 }
1143