rf_dagffwr.c revision 1.5.6.2 1 /* $NetBSD: rf_dagffwr.c,v 1.5.6.2 2001/11/14 19:15:47 nathanw Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /*
30 * rf_dagff.c
31 *
32 * code for creating fault-free DAGs
33 *
34 */
35
36 #include <sys/cdefs.h>
37 __KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.5.6.2 2001/11/14 19:15:47 nathanw Exp $");
38
39 #include <dev/raidframe/raidframevar.h>
40
41 #include "rf_raid.h"
42 #include "rf_dag.h"
43 #include "rf_dagutils.h"
44 #include "rf_dagfuncs.h"
45 #include "rf_debugMem.h"
46 #include "rf_dagffrd.h"
47 #include "rf_memchunk.h"
48 #include "rf_general.h"
49 #include "rf_dagffwr.h"
50
51 /******************************************************************************
52 *
53 * General comments on DAG creation:
54 *
55 * All DAGs in this file use roll-away error recovery. Each DAG has a single
56 * commit node, usually called "Cmt." If an error occurs before the Cmt node
57 * is reached, the execution engine will halt forward execution and work
58 * backward through the graph, executing the undo functions. Assuming that
59 * each node in the graph prior to the Cmt node are undoable and atomic - or -
60 * does not make changes to permanent state, the graph will fail atomically.
61 * If an error occurs after the Cmt node executes, the engine will roll-forward
62 * through the graph, blindly executing nodes until it reaches the end.
63 * If a graph reaches the end, it is assumed to have completed successfully.
64 *
65 * A graph has only 1 Cmt node.
66 *
67 */
68
69
70 /******************************************************************************
71 *
72 * The following wrappers map the standard DAG creation interface to the
73 * DAG creation routines. Additionally, these wrappers enable experimentation
74 * with new DAG structures by providing an extra level of indirection, allowing
75 * the DAG creation routines to be replaced at this single point.
76 */
77
78
79 void
80 rf_CreateNonRedundantWriteDAG(
81 RF_Raid_t * raidPtr,
82 RF_AccessStripeMap_t * asmap,
83 RF_DagHeader_t * dag_h,
84 void *bp,
85 RF_RaidAccessFlags_t flags,
86 RF_AllocListElem_t * allocList,
87 RF_IoType_t type)
88 {
89 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
90 RF_IO_TYPE_WRITE);
91 }
92
93 void
94 rf_CreateRAID0WriteDAG(
95 RF_Raid_t * raidPtr,
96 RF_AccessStripeMap_t * asmap,
97 RF_DagHeader_t * dag_h,
98 void *bp,
99 RF_RaidAccessFlags_t flags,
100 RF_AllocListElem_t * allocList,
101 RF_IoType_t type)
102 {
103 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
104 RF_IO_TYPE_WRITE);
105 }
106
107 void
108 rf_CreateSmallWriteDAG(
109 RF_Raid_t * raidPtr,
110 RF_AccessStripeMap_t * asmap,
111 RF_DagHeader_t * dag_h,
112 void *bp,
113 RF_RaidAccessFlags_t flags,
114 RF_AllocListElem_t * allocList)
115 {
116 /* "normal" rollaway */
117 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
118 &rf_xorFuncs, NULL);
119 }
120
121 void
122 rf_CreateLargeWriteDAG(
123 RF_Raid_t * raidPtr,
124 RF_AccessStripeMap_t * asmap,
125 RF_DagHeader_t * dag_h,
126 void *bp,
127 RF_RaidAccessFlags_t flags,
128 RF_AllocListElem_t * allocList)
129 {
130 /* "normal" rollaway */
131 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
132 1, rf_RegularXorFunc, RF_TRUE);
133 }
134
135
136 /******************************************************************************
137 *
138 * DAG creation code begins here
139 */
140
141
142 /******************************************************************************
143 *
144 * creates a DAG to perform a large-write operation:
145 *
146 * / Rod \ / Wnd \
147 * H -- block- Rod - Xor - Cmt - Wnd --- T
148 * \ Rod / \ Wnp /
149 * \[Wnq]/
150 *
151 * The XOR node also does the Q calculation in the P+Q architecture.
152 * All nodes are before the commit node (Cmt) are assumed to be atomic and
153 * undoable - or - they make no changes to permanent state.
154 *
155 * Rod = read old data
156 * Cmt = commit node
157 * Wnp = write new parity
158 * Wnd = write new data
159 * Wnq = write new "q"
160 * [] denotes optional segments in the graph
161 *
162 * Parameters: raidPtr - description of the physical array
163 * asmap - logical & physical addresses for this access
164 * bp - buffer ptr (holds write data)
165 * flags - general flags (e.g. disk locking)
166 * allocList - list of memory allocated in DAG creation
167 * nfaults - number of faults array can tolerate
168 * (equal to # redundancy units in stripe)
169 * redfuncs - list of redundancy generating functions
170 *
171 *****************************************************************************/
172
173 void
174 rf_CommonCreateLargeWriteDAG(
175 RF_Raid_t * raidPtr,
176 RF_AccessStripeMap_t * asmap,
177 RF_DagHeader_t * dag_h,
178 void *bp,
179 RF_RaidAccessFlags_t flags,
180 RF_AllocListElem_t * allocList,
181 int nfaults,
182 int (*redFunc) (RF_DagNode_t *),
183 int allowBufferRecycle)
184 {
185 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
186 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
187 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
188 RF_AccessStripeMapHeader_t *new_asm_h[2];
189 RF_StripeNum_t parityStripeID;
190 char *sosBuffer, *eosBuffer;
191 RF_ReconUnitNum_t which_ru;
192 RF_RaidLayout_t *layoutPtr;
193 RF_PhysDiskAddr_t *pda;
194
195 layoutPtr = &(raidPtr->Layout);
196 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress,
197 &which_ru);
198
199 if (rf_dagDebug) {
200 printf("[Creating large-write DAG]\n");
201 }
202 dag_h->creator = "LargeWriteDAG";
203
204 dag_h->numCommitNodes = 1;
205 dag_h->numCommits = 0;
206 dag_h->numSuccedents = 1;
207
208 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
209 nWndNodes = asmap->numStripeUnitsAccessed;
210 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
211 (RF_DagNode_t *), allocList);
212 i = 0;
213 wndNodes = &nodes[i];
214 i += nWndNodes;
215 xorNode = &nodes[i];
216 i += 1;
217 wnpNode = &nodes[i];
218 i += 1;
219 blockNode = &nodes[i];
220 i += 1;
221 commitNode = &nodes[i];
222 i += 1;
223 termNode = &nodes[i];
224 i += 1;
225 if (nfaults == 2) {
226 wnqNode = &nodes[i];
227 i += 1;
228 } else {
229 wnqNode = NULL;
230 }
231 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h,
232 &nRodNodes, &sosBuffer, &eosBuffer, allocList);
233 if (nRodNodes > 0) {
234 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
235 (RF_DagNode_t *), allocList);
236 } else {
237 rodNodes = NULL;
238 }
239
240 /* begin node initialization */
241 if (nRodNodes > 0) {
242 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
243 NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
244 } else {
245 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
246 NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
247 }
248
249 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL,
250 nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList);
251 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL,
252 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
253
254 /* initialize the Rod nodes */
255 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
256 if (new_asm_h[asmNum]) {
257 pda = new_asm_h[asmNum]->stripeMap->physInfo;
258 while (pda) {
259 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,
260 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
261 "Rod", allocList);
262 rodNodes[nodeNum].params[0].p = pda;
263 rodNodes[nodeNum].params[1].p = pda->bufPtr;
264 rodNodes[nodeNum].params[2].v = parityStripeID;
265 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
266 0, 0, which_ru);
267 nodeNum++;
268 pda = pda->next;
269 }
270 }
271 }
272 RF_ASSERT(nodeNum == nRodNodes);
273
274 /* initialize the wnd nodes */
275 pda = asmap->physInfo;
276 for (i = 0; i < nWndNodes; i++) {
277 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
278 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
279 RF_ASSERT(pda != NULL);
280 wndNodes[i].params[0].p = pda;
281 wndNodes[i].params[1].p = pda->bufPtr;
282 wndNodes[i].params[2].v = parityStripeID;
283 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
284 pda = pda->next;
285 }
286
287 /* initialize the redundancy node */
288 if (nRodNodes > 0) {
289 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
290 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
291 "Xr ", allocList);
292 } else {
293 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
294 1, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
295 }
296 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
297 for (i = 0; i < nWndNodes; i++) {
298 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */
299 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */
300 }
301 for (i = 0; i < nRodNodes; i++) {
302 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */
303 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */
304 }
305 /* xor node needs to get at RAID information */
306 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
307
308 /*
309 * Look for an Rod node that reads a complete SU. If none, alloc a buffer
310 * to receive the parity info. Note that we can't use a new data buffer
311 * because it will not have gotten written when the xor occurs.
312 */
313 if (allowBufferRecycle) {
314 for (i = 0; i < nRodNodes; i++) {
315 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
316 break;
317 }
318 }
319 if ((!allowBufferRecycle) || (i == nRodNodes)) {
320 RF_CallocAndAdd(xorNode->results[0], 1,
321 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
322 (void *), allocList);
323 } else {
324 xorNode->results[0] = rodNodes[i].params[1].p;
325 }
326
327 /* initialize the Wnp node */
328 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
329 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
330 wnpNode->params[0].p = asmap->parityInfo;
331 wnpNode->params[1].p = xorNode->results[0];
332 wnpNode->params[2].v = parityStripeID;
333 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
334 /* parityInfo must describe entire parity unit */
335 RF_ASSERT(asmap->parityInfo->next == NULL);
336
337 if (nfaults == 2) {
338 /*
339 * We never try to recycle a buffer for the Q calcuation
340 * in addition to the parity. This would cause two buffers
341 * to get smashed during the P and Q calculation, guaranteeing
342 * one would be wrong.
343 */
344 RF_CallocAndAdd(xorNode->results[1], 1,
345 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
346 (void *), allocList);
347 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
348 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
349 wnqNode->params[0].p = asmap->qInfo;
350 wnqNode->params[1].p = xorNode->results[1];
351 wnqNode->params[2].v = parityStripeID;
352 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
353 /* parityInfo must describe entire parity unit */
354 RF_ASSERT(asmap->parityInfo->next == NULL);
355 }
356 /*
357 * Connect nodes to form graph.
358 */
359
360 /* connect dag header to block node */
361 RF_ASSERT(blockNode->numAntecedents == 0);
362 dag_h->succedents[0] = blockNode;
363
364 if (nRodNodes > 0) {
365 /* connect the block node to the Rod nodes */
366 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
367 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
368 for (i = 0; i < nRodNodes; i++) {
369 RF_ASSERT(rodNodes[i].numAntecedents == 1);
370 blockNode->succedents[i] = &rodNodes[i];
371 rodNodes[i].antecedents[0] = blockNode;
372 rodNodes[i].antType[0] = rf_control;
373
374 /* connect the Rod nodes to the Xor node */
375 RF_ASSERT(rodNodes[i].numSuccedents == 1);
376 rodNodes[i].succedents[0] = xorNode;
377 xorNode->antecedents[i] = &rodNodes[i];
378 xorNode->antType[i] = rf_trueData;
379 }
380 } else {
381 /* connect the block node to the Xor node */
382 RF_ASSERT(blockNode->numSuccedents == 1);
383 RF_ASSERT(xorNode->numAntecedents == 1);
384 blockNode->succedents[0] = xorNode;
385 xorNode->antecedents[0] = blockNode;
386 xorNode->antType[0] = rf_control;
387 }
388
389 /* connect the xor node to the commit node */
390 RF_ASSERT(xorNode->numSuccedents == 1);
391 RF_ASSERT(commitNode->numAntecedents == 1);
392 xorNode->succedents[0] = commitNode;
393 commitNode->antecedents[0] = xorNode;
394 commitNode->antType[0] = rf_control;
395
396 /* connect the commit node to the write nodes */
397 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
398 for (i = 0; i < nWndNodes; i++) {
399 RF_ASSERT(wndNodes->numAntecedents == 1);
400 commitNode->succedents[i] = &wndNodes[i];
401 wndNodes[i].antecedents[0] = commitNode;
402 wndNodes[i].antType[0] = rf_control;
403 }
404 RF_ASSERT(wnpNode->numAntecedents == 1);
405 commitNode->succedents[nWndNodes] = wnpNode;
406 wnpNode->antecedents[0] = commitNode;
407 wnpNode->antType[0] = rf_trueData;
408 if (nfaults == 2) {
409 RF_ASSERT(wnqNode->numAntecedents == 1);
410 commitNode->succedents[nWndNodes + 1] = wnqNode;
411 wnqNode->antecedents[0] = commitNode;
412 wnqNode->antType[0] = rf_trueData;
413 }
414 /* connect the write nodes to the term node */
415 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
416 RF_ASSERT(termNode->numSuccedents == 0);
417 for (i = 0; i < nWndNodes; i++) {
418 RF_ASSERT(wndNodes->numSuccedents == 1);
419 wndNodes[i].succedents[0] = termNode;
420 termNode->antecedents[i] = &wndNodes[i];
421 termNode->antType[i] = rf_control;
422 }
423 RF_ASSERT(wnpNode->numSuccedents == 1);
424 wnpNode->succedents[0] = termNode;
425 termNode->antecedents[nWndNodes] = wnpNode;
426 termNode->antType[nWndNodes] = rf_control;
427 if (nfaults == 2) {
428 RF_ASSERT(wnqNode->numSuccedents == 1);
429 wnqNode->succedents[0] = termNode;
430 termNode->antecedents[nWndNodes + 1] = wnqNode;
431 termNode->antType[nWndNodes + 1] = rf_control;
432 }
433 }
434 /******************************************************************************
435 *
436 * creates a DAG to perform a small-write operation (either raid 5 or pq),
437 * which is as follows:
438 *
439 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
440 * \- Rod X / \----> Wnd [Und]-/
441 * [\- Rod X / \---> Wnd [Und]-/]
442 * [\- Roq -> Q / \--> Wnq [Unq]-/]
443 *
444 * Rop = read old parity
445 * Rod = read old data
446 * Roq = read old "q"
447 * Cmt = commit node
448 * Und = unlock data disk
449 * Unp = unlock parity disk
450 * Unq = unlock q disk
451 * Wnp = write new parity
452 * Wnd = write new data
453 * Wnq = write new "q"
454 * [ ] denotes optional segments in the graph
455 *
456 * Parameters: raidPtr - description of the physical array
457 * asmap - logical & physical addresses for this access
458 * bp - buffer ptr (holds write data)
459 * flags - general flags (e.g. disk locking)
460 * allocList - list of memory allocated in DAG creation
461 * pfuncs - list of parity generating functions
462 * qfuncs - list of q generating functions
463 *
464 * A null qfuncs indicates single fault tolerant
465 *****************************************************************************/
466
467 void
468 rf_CommonCreateSmallWriteDAG(
469 RF_Raid_t * raidPtr,
470 RF_AccessStripeMap_t * asmap,
471 RF_DagHeader_t * dag_h,
472 void *bp,
473 RF_RaidAccessFlags_t flags,
474 RF_AllocListElem_t * allocList,
475 RF_RedFuncs_t * pfuncs,
476 RF_RedFuncs_t * qfuncs)
477 {
478 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
479 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
480 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
481 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
482 int i, j, nNodes, totalNumNodes, lu_flag;
483 RF_ReconUnitNum_t which_ru;
484 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
485 int (*qfunc) (RF_DagNode_t *);
486 int numDataNodes, numParityNodes;
487 RF_StripeNum_t parityStripeID;
488 RF_PhysDiskAddr_t *pda;
489 char *name, *qname;
490 long nfaults;
491
492 nfaults = qfuncs ? 2 : 1;
493 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
494
495 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
496 asmap->raidAddress, &which_ru);
497 pda = asmap->physInfo;
498 numDataNodes = asmap->numStripeUnitsAccessed;
499 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
500
501 if (rf_dagDebug) {
502 printf("[Creating small-write DAG]\n");
503 }
504 RF_ASSERT(numDataNodes > 0);
505 dag_h->creator = "SmallWriteDAG";
506
507 dag_h->numCommitNodes = 1;
508 dag_h->numCommits = 0;
509 dag_h->numSuccedents = 1;
510
511 /*
512 * DAG creation occurs in four steps:
513 * 1. count the number of nodes in the DAG
514 * 2. create the nodes
515 * 3. initialize the nodes
516 * 4. connect the nodes
517 */
518
519 /*
520 * Step 1. compute number of nodes in the graph
521 */
522
523 /* number of nodes: a read and write for each data unit a redundancy
524 * computation node for each parity node (nfaults * nparity) a read
525 * and write for each parity unit a block and commit node (2) a
526 * terminate node if atomic RMW an unlock node for each data unit,
527 * redundancy unit */
528 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
529 + (nfaults * 2 * numParityNodes) + 3;
530 if (lu_flag) {
531 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
532 }
533 /*
534 * Step 2. create the nodes
535 */
536 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
537 (RF_DagNode_t *), allocList);
538 i = 0;
539 blockNode = &nodes[i];
540 i += 1;
541 commitNode = &nodes[i];
542 i += 1;
543 readDataNodes = &nodes[i];
544 i += numDataNodes;
545 readParityNodes = &nodes[i];
546 i += numParityNodes;
547 writeDataNodes = &nodes[i];
548 i += numDataNodes;
549 writeParityNodes = &nodes[i];
550 i += numParityNodes;
551 xorNodes = &nodes[i];
552 i += numParityNodes;
553 termNode = &nodes[i];
554 i += 1;
555 if (lu_flag) {
556 unlockDataNodes = &nodes[i];
557 i += numDataNodes;
558 unlockParityNodes = &nodes[i];
559 i += numParityNodes;
560 } else {
561 unlockDataNodes = unlockParityNodes = NULL;
562 }
563 if (nfaults == 2) {
564 readQNodes = &nodes[i];
565 i += numParityNodes;
566 writeQNodes = &nodes[i];
567 i += numParityNodes;
568 qNodes = &nodes[i];
569 i += numParityNodes;
570 if (lu_flag) {
571 unlockQNodes = &nodes[i];
572 i += numParityNodes;
573 } else {
574 unlockQNodes = NULL;
575 }
576 } else {
577 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
578 }
579 RF_ASSERT(i == totalNumNodes);
580
581 /*
582 * Step 3. initialize the nodes
583 */
584 /* initialize block node (Nil) */
585 nNodes = numDataNodes + (nfaults * numParityNodes);
586 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
587 NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
588
589 /* initialize commit node (Cmt) */
590 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
591 NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
592
593 /* initialize terminate node (Trm) */
594 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
595 NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
596
597 /* initialize nodes which read old data (Rod) */
598 for (i = 0; i < numDataNodes; i++) {
599 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
600 rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h,
601 "Rod", allocList);
602 RF_ASSERT(pda != NULL);
603 /* physical disk addr desc */
604 readDataNodes[i].params[0].p = pda;
605 /* buffer to hold old data */
606 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
607 dag_h, pda, allocList);
608 readDataNodes[i].params[2].v = parityStripeID;
609 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
610 lu_flag, 0, which_ru);
611 pda = pda->next;
612 for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
613 readDataNodes[i].propList[j] = NULL;
614 }
615 }
616
617 /* initialize nodes which read old parity (Rop) */
618 pda = asmap->parityInfo;
619 i = 0;
620 for (i = 0; i < numParityNodes; i++) {
621 RF_ASSERT(pda != NULL);
622 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc,
623 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4,
624 0, dag_h, "Rop", allocList);
625 readParityNodes[i].params[0].p = pda;
626 /* buffer to hold old parity */
627 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
628 dag_h, pda, allocList);
629 readParityNodes[i].params[2].v = parityStripeID;
630 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
631 lu_flag, 0, which_ru);
632 pda = pda->next;
633 for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
634 readParityNodes[i].propList[0] = NULL;
635 }
636 }
637
638 /* initialize nodes which read old Q (Roq) */
639 if (nfaults == 2) {
640 pda = asmap->qInfo;
641 for (i = 0; i < numParityNodes; i++) {
642 RF_ASSERT(pda != NULL);
643 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
644 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
645 readQNodes[i].params[0].p = pda;
646 /* buffer to hold old Q */
647 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda,
648 allocList);
649 readQNodes[i].params[2].v = parityStripeID;
650 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
651 lu_flag, 0, which_ru);
652 pda = pda->next;
653 for (j = 0; j < readQNodes[i].numSuccedents; j++) {
654 readQNodes[i].propList[0] = NULL;
655 }
656 }
657 }
658 /* initialize nodes which write new data (Wnd) */
659 pda = asmap->physInfo;
660 for (i = 0; i < numDataNodes; i++) {
661 RF_ASSERT(pda != NULL);
662 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
663 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
664 "Wnd", allocList);
665 /* physical disk addr desc */
666 writeDataNodes[i].params[0].p = pda;
667 /* buffer holding new data to be written */
668 writeDataNodes[i].params[1].p = pda->bufPtr;
669 writeDataNodes[i].params[2].v = parityStripeID;
670 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
671 0, 0, which_ru);
672 if (lu_flag) {
673 /* initialize node to unlock the disk queue */
674 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
675 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
676 "Und", allocList);
677 /* physical disk addr desc */
678 unlockDataNodes[i].params[0].p = pda;
679 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
680 0, lu_flag, which_ru);
681 }
682 pda = pda->next;
683 }
684
685 /*
686 * Initialize nodes which compute new parity and Q.
687 */
688 /*
689 * We use the simple XOR func in the double-XOR case, and when
690 * we're accessing only a portion of one stripe unit. The distinction
691 * between the two is that the regular XOR func assumes that the targbuf
692 * is a full SU in size, and examines the pda associated with the buffer
693 * to decide where within the buffer to XOR the data, whereas
694 * the simple XOR func just XORs the data into the start of the buffer.
695 */
696 if ((numParityNodes == 2) || ((numDataNodes == 1)
697 && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
698 func = pfuncs->simple;
699 undoFunc = rf_NullNodeUndoFunc;
700 name = pfuncs->SimpleName;
701 if (qfuncs) {
702 qfunc = qfuncs->simple;
703 qname = qfuncs->SimpleName;
704 } else {
705 qfunc = NULL;
706 qname = NULL;
707 }
708 } else {
709 func = pfuncs->regular;
710 undoFunc = rf_NullNodeUndoFunc;
711 name = pfuncs->RegularName;
712 if (qfuncs) {
713 qfunc = qfuncs->regular;
714 qname = qfuncs->RegularName;
715 } else {
716 qfunc = NULL;
717 qname = NULL;
718 }
719 }
720 /*
721 * Initialize the xor nodes: params are {pda,buf}
722 * from {Rod,Wnd,Rop} nodes, and raidPtr
723 */
724 if (numParityNodes == 2) {
725 /* double-xor case */
726 for (i = 0; i < numParityNodes; i++) {
727 /* note: no wakeup func for xor */
728 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL,
729 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList);
730 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
731 xorNodes[i].params[0] = readDataNodes[i].params[0];
732 xorNodes[i].params[1] = readDataNodes[i].params[1];
733 xorNodes[i].params[2] = readParityNodes[i].params[0];
734 xorNodes[i].params[3] = readParityNodes[i].params[1];
735 xorNodes[i].params[4] = writeDataNodes[i].params[0];
736 xorNodes[i].params[5] = writeDataNodes[i].params[1];
737 xorNodes[i].params[6].p = raidPtr;
738 /* use old parity buf as target buf */
739 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
740 if (nfaults == 2) {
741 /* note: no wakeup func for qor */
742 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
743 (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList);
744 qNodes[i].params[0] = readDataNodes[i].params[0];
745 qNodes[i].params[1] = readDataNodes[i].params[1];
746 qNodes[i].params[2] = readQNodes[i].params[0];
747 qNodes[i].params[3] = readQNodes[i].params[1];
748 qNodes[i].params[4] = writeDataNodes[i].params[0];
749 qNodes[i].params[5] = writeDataNodes[i].params[1];
750 qNodes[i].params[6].p = raidPtr;
751 /* use old Q buf as target buf */
752 qNodes[i].results[0] = readQNodes[i].params[1].p;
753 }
754 }
755 } else {
756 /* there is only one xor node in this case */
757 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1,
758 (numDataNodes + numParityNodes),
759 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
760 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
761 for (i = 0; i < numDataNodes + 1; i++) {
762 /* set up params related to Rod and Rop nodes */
763 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
764 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
765 }
766 for (i = 0; i < numDataNodes; i++) {
767 /* set up params related to Wnd and Wnp nodes */
768 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
769 writeDataNodes[i].params[0];
770 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
771 writeDataNodes[i].params[1];
772 }
773 /* xor node needs to get at RAID information */
774 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
775 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
776 if (nfaults == 2) {
777 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
778 (numDataNodes + numParityNodes),
779 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
780 qname, allocList);
781 for (i = 0; i < numDataNodes; i++) {
782 /* set up params related to Rod */
783 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
784 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
785 }
786 /* and read old q */
787 qNodes[0].params[2 * numDataNodes + 0] = /* pda */
788 readQNodes[0].params[0];
789 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */
790 readQNodes[0].params[1];
791 for (i = 0; i < numDataNodes; i++) {
792 /* set up params related to Wnd nodes */
793 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
794 writeDataNodes[i].params[0];
795 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
796 writeDataNodes[i].params[1];
797 }
798 /* xor node needs to get at RAID information */
799 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
800 qNodes[0].results[0] = readQNodes[0].params[1].p;
801 }
802 }
803
804 /* initialize nodes which write new parity (Wnp) */
805 pda = asmap->parityInfo;
806 for (i = 0; i < numParityNodes; i++) {
807 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
808 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
809 "Wnp", allocList);
810 RF_ASSERT(pda != NULL);
811 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
812 * filled in by xor node */
813 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
814 * parity write
815 * operation */
816 writeParityNodes[i].params[2].v = parityStripeID;
817 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
818 0, 0, which_ru);
819 if (lu_flag) {
820 /* initialize node to unlock the disk queue */
821 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
822 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
823 "Unp", allocList);
824 unlockParityNodes[i].params[0].p = pda; /* physical disk addr
825 * desc */
826 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
827 0, lu_flag, which_ru);
828 }
829 pda = pda->next;
830 }
831
832 /* initialize nodes which write new Q (Wnq) */
833 if (nfaults == 2) {
834 pda = asmap->qInfo;
835 for (i = 0; i < numParityNodes; i++) {
836 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
837 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
838 "Wnq", allocList);
839 RF_ASSERT(pda != NULL);
840 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
841 * filled in by xor node */
842 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
843 * parity write
844 * operation */
845 writeQNodes[i].params[2].v = parityStripeID;
846 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
847 0, 0, which_ru);
848 if (lu_flag) {
849 /* initialize node to unlock the disk queue */
850 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
851 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
852 "Unq", allocList);
853 unlockQNodes[i].params[0].p = pda; /* physical disk addr
854 * desc */
855 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
856 0, lu_flag, which_ru);
857 }
858 pda = pda->next;
859 }
860 }
861 /*
862 * Step 4. connect the nodes.
863 */
864
865 /* connect header to block node */
866 dag_h->succedents[0] = blockNode;
867
868 /* connect block node to read old data nodes */
869 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
870 for (i = 0; i < numDataNodes; i++) {
871 blockNode->succedents[i] = &readDataNodes[i];
872 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
873 readDataNodes[i].antecedents[0] = blockNode;
874 readDataNodes[i].antType[0] = rf_control;
875 }
876
877 /* connect block node to read old parity nodes */
878 for (i = 0; i < numParityNodes; i++) {
879 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
880 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
881 readParityNodes[i].antecedents[0] = blockNode;
882 readParityNodes[i].antType[0] = rf_control;
883 }
884
885 /* connect block node to read old Q nodes */
886 if (nfaults == 2) {
887 for (i = 0; i < numParityNodes; i++) {
888 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
889 RF_ASSERT(readQNodes[i].numAntecedents == 1);
890 readQNodes[i].antecedents[0] = blockNode;
891 readQNodes[i].antType[0] = rf_control;
892 }
893 }
894 /* connect read old data nodes to xor nodes */
895 for (i = 0; i < numDataNodes; i++) {
896 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
897 for (j = 0; j < numParityNodes; j++) {
898 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
899 readDataNodes[i].succedents[j] = &xorNodes[j];
900 xorNodes[j].antecedents[i] = &readDataNodes[i];
901 xorNodes[j].antType[i] = rf_trueData;
902 }
903 }
904
905 /* connect read old data nodes to q nodes */
906 if (nfaults == 2) {
907 for (i = 0; i < numDataNodes; i++) {
908 for (j = 0; j < numParityNodes; j++) {
909 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
910 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
911 qNodes[j].antecedents[i] = &readDataNodes[i];
912 qNodes[j].antType[i] = rf_trueData;
913 }
914 }
915 }
916 /* connect read old parity nodes to xor nodes */
917 for (i = 0; i < numParityNodes; i++) {
918 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
919 for (j = 0; j < numParityNodes; j++) {
920 readParityNodes[i].succedents[j] = &xorNodes[j];
921 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
922 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
923 }
924 }
925
926 /* connect read old q nodes to q nodes */
927 if (nfaults == 2) {
928 for (i = 0; i < numParityNodes; i++) {
929 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
930 for (j = 0; j < numParityNodes; j++) {
931 readQNodes[i].succedents[j] = &qNodes[j];
932 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
933 qNodes[j].antType[numDataNodes + i] = rf_trueData;
934 }
935 }
936 }
937 /* connect xor nodes to commit node */
938 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
939 for (i = 0; i < numParityNodes; i++) {
940 RF_ASSERT(xorNodes[i].numSuccedents == 1);
941 xorNodes[i].succedents[0] = commitNode;
942 commitNode->antecedents[i] = &xorNodes[i];
943 commitNode->antType[i] = rf_control;
944 }
945
946 /* connect q nodes to commit node */
947 if (nfaults == 2) {
948 for (i = 0; i < numParityNodes; i++) {
949 RF_ASSERT(qNodes[i].numSuccedents == 1);
950 qNodes[i].succedents[0] = commitNode;
951 commitNode->antecedents[i + numParityNodes] = &qNodes[i];
952 commitNode->antType[i + numParityNodes] = rf_control;
953 }
954 }
955 /* connect commit node to write nodes */
956 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
957 for (i = 0; i < numDataNodes; i++) {
958 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
959 commitNode->succedents[i] = &writeDataNodes[i];
960 writeDataNodes[i].antecedents[0] = commitNode;
961 writeDataNodes[i].antType[0] = rf_trueData;
962 }
963 for (i = 0; i < numParityNodes; i++) {
964 RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
965 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
966 writeParityNodes[i].antecedents[0] = commitNode;
967 writeParityNodes[i].antType[0] = rf_trueData;
968 }
969 if (nfaults == 2) {
970 for (i = 0; i < numParityNodes; i++) {
971 RF_ASSERT(writeQNodes[i].numAntecedents == 1);
972 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
973 writeQNodes[i].antecedents[0] = commitNode;
974 writeQNodes[i].antType[0] = rf_trueData;
975 }
976 }
977 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
978 RF_ASSERT(termNode->numSuccedents == 0);
979 for (i = 0; i < numDataNodes; i++) {
980 if (lu_flag) {
981 /* connect write new data nodes to unlock nodes */
982 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
983 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
984 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
985 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
986 unlockDataNodes[i].antType[0] = rf_control;
987
988 /* connect unlock nodes to term node */
989 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
990 unlockDataNodes[i].succedents[0] = termNode;
991 termNode->antecedents[i] = &unlockDataNodes[i];
992 termNode->antType[i] = rf_control;
993 } else {
994 /* connect write new data nodes to term node */
995 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
996 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
997 writeDataNodes[i].succedents[0] = termNode;
998 termNode->antecedents[i] = &writeDataNodes[i];
999 termNode->antType[i] = rf_control;
1000 }
1001 }
1002
1003 for (i = 0; i < numParityNodes; i++) {
1004 if (lu_flag) {
1005 /* connect write new parity nodes to unlock nodes */
1006 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1007 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1008 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1009 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1010 unlockParityNodes[i].antType[0] = rf_control;
1011
1012 /* connect unlock nodes to term node */
1013 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1014 unlockParityNodes[i].succedents[0] = termNode;
1015 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1016 termNode->antType[numDataNodes + i] = rf_control;
1017 } else {
1018 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1019 writeParityNodes[i].succedents[0] = termNode;
1020 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1021 termNode->antType[numDataNodes + i] = rf_control;
1022 }
1023 }
1024
1025 if (nfaults == 2) {
1026 for (i = 0; i < numParityNodes; i++) {
1027 if (lu_flag) {
1028 /* connect write new Q nodes to unlock nodes */
1029 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1030 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1031 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1032 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1033 unlockQNodes[i].antType[0] = rf_control;
1034
1035 /* connect unlock nodes to unblock node */
1036 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1037 unlockQNodes[i].succedents[0] = termNode;
1038 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1039 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1040 } else {
1041 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1042 writeQNodes[i].succedents[0] = termNode;
1043 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1044 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1045 }
1046 }
1047 }
1048 }
1049
1050
1051 /******************************************************************************
1052 * create a write graph (fault-free or degraded) for RAID level 1
1053 *
1054 * Hdr -> Commit -> Wpd -> Nil -> Trm
1055 * -> Wsd ->
1056 *
1057 * The "Wpd" node writes data to the primary copy in the mirror pair
1058 * The "Wsd" node writes data to the secondary copy in the mirror pair
1059 *
1060 * Parameters: raidPtr - description of the physical array
1061 * asmap - logical & physical addresses for this access
1062 * bp - buffer ptr (holds write data)
1063 * flags - general flags (e.g. disk locking)
1064 * allocList - list of memory allocated in DAG creation
1065 *****************************************************************************/
1066
1067 void
1068 rf_CreateRaidOneWriteDAG(
1069 RF_Raid_t * raidPtr,
1070 RF_AccessStripeMap_t * asmap,
1071 RF_DagHeader_t * dag_h,
1072 void *bp,
1073 RF_RaidAccessFlags_t flags,
1074 RF_AllocListElem_t * allocList)
1075 {
1076 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1077 RF_DagNode_t *nodes, *wndNode, *wmirNode;
1078 int nWndNodes, nWmirNodes, i;
1079 RF_ReconUnitNum_t which_ru;
1080 RF_PhysDiskAddr_t *pda, *pdaP;
1081 RF_StripeNum_t parityStripeID;
1082
1083 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1084 asmap->raidAddress, &which_ru);
1085 if (rf_dagDebug) {
1086 printf("[Creating RAID level 1 write DAG]\n");
1087 }
1088 dag_h->creator = "RaidOneWriteDAG";
1089
1090 /* 2 implies access not SU aligned */
1091 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1092 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1093
1094 /* alloc the Wnd nodes and the Wmir node */
1095 if (asmap->numDataFailed == 1)
1096 nWndNodes--;
1097 if (asmap->numParityFailed == 1)
1098 nWmirNodes--;
1099
1100 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock
1101 * + terminator) */
1102 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1103 (RF_DagNode_t *), allocList);
1104 i = 0;
1105 wndNode = &nodes[i];
1106 i += nWndNodes;
1107 wmirNode = &nodes[i];
1108 i += nWmirNodes;
1109 commitNode = &nodes[i];
1110 i += 1;
1111 unblockNode = &nodes[i];
1112 i += 1;
1113 termNode = &nodes[i];
1114 i += 1;
1115 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1116
1117 /* this dag can commit immediately */
1118 dag_h->numCommitNodes = 1;
1119 dag_h->numCommits = 0;
1120 dag_h->numSuccedents = 1;
1121
1122 /* initialize the commit, unblock, and term nodes */
1123 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1124 NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList);
1125 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1126 NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
1127 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
1128 NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1129
1130 /* initialize the wnd nodes */
1131 if (nWndNodes > 0) {
1132 pda = asmap->physInfo;
1133 for (i = 0; i < nWndNodes; i++) {
1134 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1135 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
1136 RF_ASSERT(pda != NULL);
1137 wndNode[i].params[0].p = pda;
1138 wndNode[i].params[1].p = pda->bufPtr;
1139 wndNode[i].params[2].v = parityStripeID;
1140 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1141 pda = pda->next;
1142 }
1143 RF_ASSERT(pda == NULL);
1144 }
1145 /* initialize the mirror nodes */
1146 if (nWmirNodes > 0) {
1147 pda = asmap->physInfo;
1148 pdaP = asmap->parityInfo;
1149 for (i = 0; i < nWmirNodes; i++) {
1150 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1151 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
1152 RF_ASSERT(pda != NULL);
1153 wmirNode[i].params[0].p = pdaP;
1154 wmirNode[i].params[1].p = pda->bufPtr;
1155 wmirNode[i].params[2].v = parityStripeID;
1156 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1157 pda = pda->next;
1158 pdaP = pdaP->next;
1159 }
1160 RF_ASSERT(pda == NULL);
1161 RF_ASSERT(pdaP == NULL);
1162 }
1163 /* link the header node to the commit node */
1164 RF_ASSERT(dag_h->numSuccedents == 1);
1165 RF_ASSERT(commitNode->numAntecedents == 0);
1166 dag_h->succedents[0] = commitNode;
1167
1168 /* link the commit node to the write nodes */
1169 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1170 for (i = 0; i < nWndNodes; i++) {
1171 RF_ASSERT(wndNode[i].numAntecedents == 1);
1172 commitNode->succedents[i] = &wndNode[i];
1173 wndNode[i].antecedents[0] = commitNode;
1174 wndNode[i].antType[0] = rf_control;
1175 }
1176 for (i = 0; i < nWmirNodes; i++) {
1177 RF_ASSERT(wmirNode[i].numAntecedents == 1);
1178 commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1179 wmirNode[i].antecedents[0] = commitNode;
1180 wmirNode[i].antType[0] = rf_control;
1181 }
1182
1183 /* link the write nodes to the unblock node */
1184 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1185 for (i = 0; i < nWndNodes; i++) {
1186 RF_ASSERT(wndNode[i].numSuccedents == 1);
1187 wndNode[i].succedents[0] = unblockNode;
1188 unblockNode->antecedents[i] = &wndNode[i];
1189 unblockNode->antType[i] = rf_control;
1190 }
1191 for (i = 0; i < nWmirNodes; i++) {
1192 RF_ASSERT(wmirNode[i].numSuccedents == 1);
1193 wmirNode[i].succedents[0] = unblockNode;
1194 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1195 unblockNode->antType[i + nWndNodes] = rf_control;
1196 }
1197
1198 /* link the unblock node to the term node */
1199 RF_ASSERT(unblockNode->numSuccedents == 1);
1200 RF_ASSERT(termNode->numAntecedents == 1);
1201 RF_ASSERT(termNode->numSuccedents == 0);
1202 unblockNode->succedents[0] = termNode;
1203 termNode->antecedents[0] = unblockNode;
1204 termNode->antType[0] = rf_control;
1205 }
1206
1207
1208
1209 /* DAGs which have no commit points.
1210 *
1211 * The following DAGs are used in forward and backward error recovery experiments.
1212 * They are identical to the DAGs above this comment with the exception that the
1213 * the commit points have been removed.
1214 */
1215
1216
1217
1218 void
1219 rf_CommonCreateLargeWriteDAGFwd(
1220 RF_Raid_t * raidPtr,
1221 RF_AccessStripeMap_t * asmap,
1222 RF_DagHeader_t * dag_h,
1223 void *bp,
1224 RF_RaidAccessFlags_t flags,
1225 RF_AllocListElem_t * allocList,
1226 int nfaults,
1227 int (*redFunc) (RF_DagNode_t *),
1228 int allowBufferRecycle)
1229 {
1230 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1231 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1232 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1233 RF_AccessStripeMapHeader_t *new_asm_h[2];
1234 RF_StripeNum_t parityStripeID;
1235 char *sosBuffer, *eosBuffer;
1236 RF_ReconUnitNum_t which_ru;
1237 RF_RaidLayout_t *layoutPtr;
1238 RF_PhysDiskAddr_t *pda;
1239
1240 layoutPtr = &(raidPtr->Layout);
1241 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1242
1243 if (rf_dagDebug)
1244 printf("[Creating large-write DAG]\n");
1245 dag_h->creator = "LargeWriteDAGFwd";
1246
1247 dag_h->numCommitNodes = 0;
1248 dag_h->numCommits = 0;
1249 dag_h->numSuccedents = 1;
1250
1251 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
1252 nWndNodes = asmap->numStripeUnitsAccessed;
1253 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1254 i = 0;
1255 wndNodes = &nodes[i];
1256 i += nWndNodes;
1257 xorNode = &nodes[i];
1258 i += 1;
1259 wnpNode = &nodes[i];
1260 i += 1;
1261 blockNode = &nodes[i];
1262 i += 1;
1263 syncNode = &nodes[i];
1264 i += 1;
1265 termNode = &nodes[i];
1266 i += 1;
1267 if (nfaults == 2) {
1268 wnqNode = &nodes[i];
1269 i += 1;
1270 } else {
1271 wnqNode = NULL;
1272 }
1273 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1274 if (nRodNodes > 0) {
1275 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1276 } else {
1277 rodNodes = NULL;
1278 }
1279
1280 /* begin node initialization */
1281 if (nRodNodes > 0) {
1282 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
1283 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList);
1284 } else {
1285 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
1286 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList);
1287 }
1288
1289 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
1290
1291 /* initialize the Rod nodes */
1292 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1293 if (new_asm_h[asmNum]) {
1294 pda = new_asm_h[asmNum]->stripeMap->physInfo;
1295 while (pda) {
1296 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList);
1297 rodNodes[nodeNum].params[0].p = pda;
1298 rodNodes[nodeNum].params[1].p = pda->bufPtr;
1299 rodNodes[nodeNum].params[2].v = parityStripeID;
1300 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1301 nodeNum++;
1302 pda = pda->next;
1303 }
1304 }
1305 }
1306 RF_ASSERT(nodeNum == nRodNodes);
1307
1308 /* initialize the wnd nodes */
1309 pda = asmap->physInfo;
1310 for (i = 0; i < nWndNodes; i++) {
1311 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1312 RF_ASSERT(pda != NULL);
1313 wndNodes[i].params[0].p = pda;
1314 wndNodes[i].params[1].p = pda->bufPtr;
1315 wndNodes[i].params[2].v = parityStripeID;
1316 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1317 pda = pda->next;
1318 }
1319
1320 /* initialize the redundancy node */
1321 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
1322 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1323 for (i = 0; i < nWndNodes; i++) {
1324 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */
1325 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */
1326 }
1327 for (i = 0; i < nRodNodes; i++) {
1328 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */
1329 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */
1330 }
1331 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; /* xor node needs to get
1332 * at RAID information */
1333
1334 /* look for an Rod node that reads a complete SU. If none, alloc a
1335 * buffer to receive the parity info. Note that we can't use a new
1336 * data buffer because it will not have gotten written when the xor
1337 * occurs. */
1338 if (allowBufferRecycle) {
1339 for (i = 0; i < nRodNodes; i++)
1340 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1341 break;
1342 }
1343 if ((!allowBufferRecycle) || (i == nRodNodes)) {
1344 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1345 } else
1346 xorNode->results[0] = rodNodes[i].params[1].p;
1347
1348 /* initialize the Wnp node */
1349 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
1350 wnpNode->params[0].p = asmap->parityInfo;
1351 wnpNode->params[1].p = xorNode->results[0];
1352 wnpNode->params[2].v = parityStripeID;
1353 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1354 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must
1355 * describe entire
1356 * parity unit */
1357
1358 if (nfaults == 2) {
1359 /* we never try to recycle a buffer for the Q calcuation in
1360 * addition to the parity. This would cause two buffers to get
1361 * smashed during the P and Q calculation, guaranteeing one
1362 * would be wrong. */
1363 RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1364 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
1365 wnqNode->params[0].p = asmap->qInfo;
1366 wnqNode->params[1].p = xorNode->results[1];
1367 wnqNode->params[2].v = parityStripeID;
1368 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1369 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must
1370 * describe entire
1371 * parity unit */
1372 }
1373 /* connect nodes to form graph */
1374
1375 /* connect dag header to block node */
1376 RF_ASSERT(blockNode->numAntecedents == 0);
1377 dag_h->succedents[0] = blockNode;
1378
1379 if (nRodNodes > 0) {
1380 /* connect the block node to the Rod nodes */
1381 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1382 RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1383 for (i = 0; i < nRodNodes; i++) {
1384 RF_ASSERT(rodNodes[i].numAntecedents == 1);
1385 blockNode->succedents[i] = &rodNodes[i];
1386 rodNodes[i].antecedents[0] = blockNode;
1387 rodNodes[i].antType[0] = rf_control;
1388
1389 /* connect the Rod nodes to the Nil node */
1390 RF_ASSERT(rodNodes[i].numSuccedents == 1);
1391 rodNodes[i].succedents[0] = syncNode;
1392 syncNode->antecedents[i] = &rodNodes[i];
1393 syncNode->antType[i] = rf_trueData;
1394 }
1395 } else {
1396 /* connect the block node to the Nil node */
1397 RF_ASSERT(blockNode->numSuccedents == 1);
1398 RF_ASSERT(syncNode->numAntecedents == 1);
1399 blockNode->succedents[0] = syncNode;
1400 syncNode->antecedents[0] = blockNode;
1401 syncNode->antType[0] = rf_control;
1402 }
1403
1404 /* connect the sync node to the Wnd nodes */
1405 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1406 for (i = 0; i < nWndNodes; i++) {
1407 RF_ASSERT(wndNodes->numAntecedents == 1);
1408 syncNode->succedents[i] = &wndNodes[i];
1409 wndNodes[i].antecedents[0] = syncNode;
1410 wndNodes[i].antType[0] = rf_control;
1411 }
1412
1413 /* connect the sync node to the Xor node */
1414 RF_ASSERT(xorNode->numAntecedents == 1);
1415 syncNode->succedents[nWndNodes] = xorNode;
1416 xorNode->antecedents[0] = syncNode;
1417 xorNode->antType[0] = rf_control;
1418
1419 /* connect the xor node to the write parity node */
1420 RF_ASSERT(xorNode->numSuccedents == nfaults);
1421 RF_ASSERT(wnpNode->numAntecedents == 1);
1422 xorNode->succedents[0] = wnpNode;
1423 wnpNode->antecedents[0] = xorNode;
1424 wnpNode->antType[0] = rf_trueData;
1425 if (nfaults == 2) {
1426 RF_ASSERT(wnqNode->numAntecedents == 1);
1427 xorNode->succedents[1] = wnqNode;
1428 wnqNode->antecedents[0] = xorNode;
1429 wnqNode->antType[0] = rf_trueData;
1430 }
1431 /* connect the write nodes to the term node */
1432 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1433 RF_ASSERT(termNode->numSuccedents == 0);
1434 for (i = 0; i < nWndNodes; i++) {
1435 RF_ASSERT(wndNodes->numSuccedents == 1);
1436 wndNodes[i].succedents[0] = termNode;
1437 termNode->antecedents[i] = &wndNodes[i];
1438 termNode->antType[i] = rf_control;
1439 }
1440 RF_ASSERT(wnpNode->numSuccedents == 1);
1441 wnpNode->succedents[0] = termNode;
1442 termNode->antecedents[nWndNodes] = wnpNode;
1443 termNode->antType[nWndNodes] = rf_control;
1444 if (nfaults == 2) {
1445 RF_ASSERT(wnqNode->numSuccedents == 1);
1446 wnqNode->succedents[0] = termNode;
1447 termNode->antecedents[nWndNodes + 1] = wnqNode;
1448 termNode->antType[nWndNodes + 1] = rf_control;
1449 }
1450 }
1451
1452
1453 /******************************************************************************
1454 *
1455 * creates a DAG to perform a small-write operation (either raid 5 or pq),
1456 * which is as follows:
1457 *
1458 * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
1459 * \- Rod X- Wnd [Und] -------/
1460 * [\- Rod X- Wnd [Und] ------/]
1461 * [\- Roq - Q --> Wnq [Unq]-/]
1462 *
1463 * Rop = read old parity
1464 * Rod = read old data
1465 * Roq = read old "q"
1466 * Cmt = commit node
1467 * Und = unlock data disk
1468 * Unp = unlock parity disk
1469 * Unq = unlock q disk
1470 * Wnp = write new parity
1471 * Wnd = write new data
1472 * Wnq = write new "q"
1473 * [ ] denotes optional segments in the graph
1474 *
1475 * Parameters: raidPtr - description of the physical array
1476 * asmap - logical & physical addresses for this access
1477 * bp - buffer ptr (holds write data)
1478 * flags - general flags (e.g. disk locking)
1479 * allocList - list of memory allocated in DAG creation
1480 * pfuncs - list of parity generating functions
1481 * qfuncs - list of q generating functions
1482 *
1483 * A null qfuncs indicates single fault tolerant
1484 *****************************************************************************/
1485
1486 void
1487 rf_CommonCreateSmallWriteDAGFwd(
1488 RF_Raid_t * raidPtr,
1489 RF_AccessStripeMap_t * asmap,
1490 RF_DagHeader_t * dag_h,
1491 void *bp,
1492 RF_RaidAccessFlags_t flags,
1493 RF_AllocListElem_t * allocList,
1494 RF_RedFuncs_t * pfuncs,
1495 RF_RedFuncs_t * qfuncs)
1496 {
1497 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1498 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1499 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1500 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1501 int i, j, nNodes, totalNumNodes, lu_flag;
1502 RF_ReconUnitNum_t which_ru;
1503 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
1504 int (*qfunc) (RF_DagNode_t *);
1505 int numDataNodes, numParityNodes;
1506 RF_StripeNum_t parityStripeID;
1507 RF_PhysDiskAddr_t *pda;
1508 char *name, *qname;
1509 long nfaults;
1510
1511 nfaults = qfuncs ? 2 : 1;
1512 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
1513
1514 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1515 pda = asmap->physInfo;
1516 numDataNodes = asmap->numStripeUnitsAccessed;
1517 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1518
1519 if (rf_dagDebug)
1520 printf("[Creating small-write DAG]\n");
1521 RF_ASSERT(numDataNodes > 0);
1522 dag_h->creator = "SmallWriteDAGFwd";
1523
1524 dag_h->numCommitNodes = 0;
1525 dag_h->numCommits = 0;
1526 dag_h->numSuccedents = 1;
1527
1528 qfunc = NULL;
1529 qname = NULL;
1530
1531 /* DAG creation occurs in four steps: 1. count the number of nodes in
1532 * the DAG 2. create the nodes 3. initialize the nodes 4. connect the
1533 * nodes */
1534
1535 /* Step 1. compute number of nodes in the graph */
1536
1537 /* number of nodes: a read and write for each data unit a redundancy
1538 * computation node for each parity node (nfaults * nparity) a read
1539 * and write for each parity unit a block node a terminate node if
1540 * atomic RMW an unlock node for each data unit, redundancy unit */
1541 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2;
1542 if (lu_flag)
1543 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1544
1545
1546 /* Step 2. create the nodes */
1547 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1548 i = 0;
1549 blockNode = &nodes[i];
1550 i += 1;
1551 readDataNodes = &nodes[i];
1552 i += numDataNodes;
1553 readParityNodes = &nodes[i];
1554 i += numParityNodes;
1555 writeDataNodes = &nodes[i];
1556 i += numDataNodes;
1557 writeParityNodes = &nodes[i];
1558 i += numParityNodes;
1559 xorNodes = &nodes[i];
1560 i += numParityNodes;
1561 termNode = &nodes[i];
1562 i += 1;
1563 if (lu_flag) {
1564 unlockDataNodes = &nodes[i];
1565 i += numDataNodes;
1566 unlockParityNodes = &nodes[i];
1567 i += numParityNodes;
1568 } else {
1569 unlockDataNodes = unlockParityNodes = NULL;
1570 }
1571 if (nfaults == 2) {
1572 readQNodes = &nodes[i];
1573 i += numParityNodes;
1574 writeQNodes = &nodes[i];
1575 i += numParityNodes;
1576 qNodes = &nodes[i];
1577 i += numParityNodes;
1578 if (lu_flag) {
1579 unlockQNodes = &nodes[i];
1580 i += numParityNodes;
1581 } else {
1582 unlockQNodes = NULL;
1583 }
1584 } else {
1585 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1586 }
1587 RF_ASSERT(i == totalNumNodes);
1588
1589 /* Step 3. initialize the nodes */
1590 /* initialize block node (Nil) */
1591 nNodes = numDataNodes + (nfaults * numParityNodes);
1592 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
1593
1594 /* initialize terminate node (Trm) */
1595 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
1596
1597 /* initialize nodes which read old data (Rod) */
1598 for (i = 0; i < numDataNodes; i++) {
1599 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList);
1600 RF_ASSERT(pda != NULL);
1601 readDataNodes[i].params[0].p = pda; /* physical disk addr
1602 * desc */
1603 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old
1604 * data */
1605 readDataNodes[i].params[2].v = parityStripeID;
1606 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1607 pda = pda->next;
1608 for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1609 readDataNodes[i].propList[j] = NULL;
1610 }
1611
1612 /* initialize nodes which read old parity (Rop) */
1613 pda = asmap->parityInfo;
1614 i = 0;
1615 for (i = 0; i < numParityNodes; i++) {
1616 RF_ASSERT(pda != NULL);
1617 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1618 readParityNodes[i].params[0].p = pda;
1619 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old
1620 * parity */
1621 readParityNodes[i].params[2].v = parityStripeID;
1622 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1623 for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1624 readParityNodes[i].propList[0] = NULL;
1625 pda = pda->next;
1626 }
1627
1628 /* initialize nodes which read old Q (Roq) */
1629 if (nfaults == 2) {
1630 pda = asmap->qInfo;
1631 for (i = 0; i < numParityNodes; i++) {
1632 RF_ASSERT(pda != NULL);
1633 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
1634 readQNodes[i].params[0].p = pda;
1635 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */
1636 readQNodes[i].params[2].v = parityStripeID;
1637 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1638 for (j = 0; j < readQNodes[i].numSuccedents; j++)
1639 readQNodes[i].propList[0] = NULL;
1640 pda = pda->next;
1641 }
1642 }
1643 /* initialize nodes which write new data (Wnd) */
1644 pda = asmap->physInfo;
1645 for (i = 0; i < numDataNodes; i++) {
1646 RF_ASSERT(pda != NULL);
1647 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1648 writeDataNodes[i].params[0].p = pda; /* physical disk addr
1649 * desc */
1650 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new
1651 * data to be written */
1652 writeDataNodes[i].params[2].v = parityStripeID;
1653 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1654
1655 if (lu_flag) {
1656 /* initialize node to unlock the disk queue */
1657 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
1658 unlockDataNodes[i].params[0].p = pda; /* physical disk addr
1659 * desc */
1660 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1661 }
1662 pda = pda->next;
1663 }
1664
1665
1666 /* initialize nodes which compute new parity and Q */
1667 /* we use the simple XOR func in the double-XOR case, and when we're
1668 * accessing only a portion of one stripe unit. the distinction
1669 * between the two is that the regular XOR func assumes that the
1670 * targbuf is a full SU in size, and examines the pda associated with
1671 * the buffer to decide where within the buffer to XOR the data,
1672 * whereas the simple XOR func just XORs the data into the start of
1673 * the buffer. */
1674 if ((numParityNodes == 2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
1675 func = pfuncs->simple;
1676 undoFunc = rf_NullNodeUndoFunc;
1677 name = pfuncs->SimpleName;
1678 if (qfuncs) {
1679 qfunc = qfuncs->simple;
1680 qname = qfuncs->SimpleName;
1681 }
1682 } else {
1683 func = pfuncs->regular;
1684 undoFunc = rf_NullNodeUndoFunc;
1685 name = pfuncs->RegularName;
1686 if (qfuncs) {
1687 qfunc = qfuncs->regular;
1688 qname = qfuncs->RegularName;
1689 }
1690 }
1691 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop}
1692 * nodes, and raidPtr */
1693 if (numParityNodes == 2) { /* double-xor case */
1694 for (i = 0; i < numParityNodes; i++) {
1695 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for
1696 * xor */
1697 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1698 xorNodes[i].params[0] = readDataNodes[i].params[0];
1699 xorNodes[i].params[1] = readDataNodes[i].params[1];
1700 xorNodes[i].params[2] = readParityNodes[i].params[0];
1701 xorNodes[i].params[3] = readParityNodes[i].params[1];
1702 xorNodes[i].params[4] = writeDataNodes[i].params[0];
1703 xorNodes[i].params[5] = writeDataNodes[i].params[1];
1704 xorNodes[i].params[6].p = raidPtr;
1705 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as
1706 * target buf */
1707 if (nfaults == 2) {
1708 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList); /* no wakeup func for
1709 * xor */
1710 qNodes[i].params[0] = readDataNodes[i].params[0];
1711 qNodes[i].params[1] = readDataNodes[i].params[1];
1712 qNodes[i].params[2] = readQNodes[i].params[0];
1713 qNodes[i].params[3] = readQNodes[i].params[1];
1714 qNodes[i].params[4] = writeDataNodes[i].params[0];
1715 qNodes[i].params[5] = writeDataNodes[i].params[1];
1716 qNodes[i].params[6].p = raidPtr;
1717 qNodes[i].results[0] = readQNodes[i].params[1].p; /* use old Q buf as
1718 * target buf */
1719 }
1720 }
1721 } else {
1722 /* there is only one xor node in this case */
1723 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
1724 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1725 for (i = 0; i < numDataNodes + 1; i++) {
1726 /* set up params related to Rod and Rop nodes */
1727 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
1728 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */
1729 }
1730 for (i = 0; i < numDataNodes; i++) {
1731 /* set up params related to Wnd and Wnp nodes */
1732 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */
1733 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */
1734 }
1735 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get
1736 * at RAID information */
1737 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1738 if (nfaults == 2) {
1739 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList);
1740 for (i = 0; i < numDataNodes; i++) {
1741 /* set up params related to Rod */
1742 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
1743 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */
1744 }
1745 /* and read old q */
1746 qNodes[0].params[2 * numDataNodes + 0] = readQNodes[0].params[0]; /* pda */
1747 qNodes[0].params[2 * numDataNodes + 1] = readQNodes[0].params[1]; /* buffer pointer */
1748 for (i = 0; i < numDataNodes; i++) {
1749 /* set up params related to Wnd nodes */
1750 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */
1751 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */
1752 }
1753 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get
1754 * at RAID information */
1755 qNodes[0].results[0] = readQNodes[0].params[1].p;
1756 }
1757 }
1758
1759 /* initialize nodes which write new parity (Wnp) */
1760 pda = asmap->parityInfo;
1761 for (i = 0; i < numParityNodes; i++) {
1762 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList);
1763 RF_ASSERT(pda != NULL);
1764 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
1765 * filled in by xor node */
1766 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
1767 * parity write
1768 * operation */
1769 writeParityNodes[i].params[2].v = parityStripeID;
1770 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1771
1772 if (lu_flag) {
1773 /* initialize node to unlock the disk queue */
1774 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList);
1775 unlockParityNodes[i].params[0].p = pda; /* physical disk addr
1776 * desc */
1777 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1778 }
1779 pda = pda->next;
1780 }
1781
1782 /* initialize nodes which write new Q (Wnq) */
1783 if (nfaults == 2) {
1784 pda = asmap->qInfo;
1785 for (i = 0; i < numParityNodes; i++) {
1786 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList);
1787 RF_ASSERT(pda != NULL);
1788 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
1789 * filled in by xor node */
1790 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
1791 * parity write
1792 * operation */
1793 writeQNodes[i].params[2].v = parityStripeID;
1794 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1795
1796 if (lu_flag) {
1797 /* initialize node to unlock the disk queue */
1798 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList);
1799 unlockQNodes[i].params[0].p = pda; /* physical disk addr
1800 * desc */
1801 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1802 }
1803 pda = pda->next;
1804 }
1805 }
1806 /* Step 4. connect the nodes */
1807
1808 /* connect header to block node */
1809 dag_h->succedents[0] = blockNode;
1810
1811 /* connect block node to read old data nodes */
1812 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
1813 for (i = 0; i < numDataNodes; i++) {
1814 blockNode->succedents[i] = &readDataNodes[i];
1815 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
1816 readDataNodes[i].antecedents[0] = blockNode;
1817 readDataNodes[i].antType[0] = rf_control;
1818 }
1819
1820 /* connect block node to read old parity nodes */
1821 for (i = 0; i < numParityNodes; i++) {
1822 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
1823 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
1824 readParityNodes[i].antecedents[0] = blockNode;
1825 readParityNodes[i].antType[0] = rf_control;
1826 }
1827
1828 /* connect block node to read old Q nodes */
1829 if (nfaults == 2)
1830 for (i = 0; i < numParityNodes; i++) {
1831 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
1832 RF_ASSERT(readQNodes[i].numAntecedents == 1);
1833 readQNodes[i].antecedents[0] = blockNode;
1834 readQNodes[i].antType[0] = rf_control;
1835 }
1836
1837 /* connect read old data nodes to write new data nodes */
1838 for (i = 0; i < numDataNodes; i++) {
1839 RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1));
1840 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1841 readDataNodes[i].succedents[0] = &writeDataNodes[i];
1842 writeDataNodes[i].antecedents[0] = &readDataNodes[i];
1843 writeDataNodes[i].antType[0] = rf_antiData;
1844 }
1845
1846 /* connect read old data nodes to xor nodes */
1847 for (i = 0; i < numDataNodes; i++) {
1848 for (j = 0; j < numParityNodes; j++) {
1849 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
1850 readDataNodes[i].succedents[1 + j] = &xorNodes[j];
1851 xorNodes[j].antecedents[i] = &readDataNodes[i];
1852 xorNodes[j].antType[i] = rf_trueData;
1853 }
1854 }
1855
1856 /* connect read old data nodes to q nodes */
1857 if (nfaults == 2)
1858 for (i = 0; i < numDataNodes; i++)
1859 for (j = 0; j < numParityNodes; j++) {
1860 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
1861 readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j];
1862 qNodes[j].antecedents[i] = &readDataNodes[i];
1863 qNodes[j].antType[i] = rf_trueData;
1864 }
1865
1866 /* connect read old parity nodes to xor nodes */
1867 for (i = 0; i < numParityNodes; i++) {
1868 for (j = 0; j < numParityNodes; j++) {
1869 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1870 readParityNodes[i].succedents[j] = &xorNodes[j];
1871 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
1872 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
1873 }
1874 }
1875
1876 /* connect read old q nodes to q nodes */
1877 if (nfaults == 2)
1878 for (i = 0; i < numParityNodes; i++) {
1879 for (j = 0; j < numParityNodes; j++) {
1880 RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes);
1881 readQNodes[i].succedents[j] = &qNodes[j];
1882 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
1883 qNodes[j].antType[numDataNodes + i] = rf_trueData;
1884 }
1885 }
1886
1887 /* connect xor nodes to the write new parity nodes */
1888 for (i = 0; i < numParityNodes; i++) {
1889 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
1890 for (j = 0; j < numParityNodes; j++) {
1891 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
1892 xorNodes[i].succedents[j] = &writeParityNodes[j];
1893 writeParityNodes[j].antecedents[i] = &xorNodes[i];
1894 writeParityNodes[j].antType[i] = rf_trueData;
1895 }
1896 }
1897
1898 /* connect q nodes to the write new q nodes */
1899 if (nfaults == 2)
1900 for (i = 0; i < numParityNodes; i++) {
1901 RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes);
1902 for (j = 0; j < numParityNodes; j++) {
1903 RF_ASSERT(qNodes[j].numSuccedents == 1);
1904 qNodes[i].succedents[j] = &writeQNodes[j];
1905 writeQNodes[j].antecedents[i] = &qNodes[i];
1906 writeQNodes[j].antType[i] = rf_trueData;
1907 }
1908 }
1909
1910 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1911 RF_ASSERT(termNode->numSuccedents == 0);
1912 for (i = 0; i < numDataNodes; i++) {
1913 if (lu_flag) {
1914 /* connect write new data nodes to unlock nodes */
1915 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1916 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1917 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1918 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1919 unlockDataNodes[i].antType[0] = rf_control;
1920
1921 /* connect unlock nodes to term node */
1922 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1923 unlockDataNodes[i].succedents[0] = termNode;
1924 termNode->antecedents[i] = &unlockDataNodes[i];
1925 termNode->antType[i] = rf_control;
1926 } else {
1927 /* connect write new data nodes to term node */
1928 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1929 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1930 writeDataNodes[i].succedents[0] = termNode;
1931 termNode->antecedents[i] = &writeDataNodes[i];
1932 termNode->antType[i] = rf_control;
1933 }
1934 }
1935
1936 for (i = 0; i < numParityNodes; i++) {
1937 if (lu_flag) {
1938 /* connect write new parity nodes to unlock nodes */
1939 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1940 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1941 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1942 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1943 unlockParityNodes[i].antType[0] = rf_control;
1944
1945 /* connect unlock nodes to term node */
1946 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1947 unlockParityNodes[i].succedents[0] = termNode;
1948 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1949 termNode->antType[numDataNodes + i] = rf_control;
1950 } else {
1951 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1952 writeParityNodes[i].succedents[0] = termNode;
1953 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1954 termNode->antType[numDataNodes + i] = rf_control;
1955 }
1956 }
1957
1958 if (nfaults == 2)
1959 for (i = 0; i < numParityNodes; i++) {
1960 if (lu_flag) {
1961 /* connect write new Q nodes to unlock nodes */
1962 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1963 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1964 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1965 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1966 unlockQNodes[i].antType[0] = rf_control;
1967
1968 /* connect unlock nodes to unblock node */
1969 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1970 unlockQNodes[i].succedents[0] = termNode;
1971 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1972 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1973 } else {
1974 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1975 writeQNodes[i].succedents[0] = termNode;
1976 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1977 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1978 }
1979 }
1980 }
1981
1982
1983
1984 /******************************************************************************
1985 * create a write graph (fault-free or degraded) for RAID level 1
1986 *
1987 * Hdr Nil -> Wpd -> Nil -> Trm
1988 * Nil -> Wsd ->
1989 *
1990 * The "Wpd" node writes data to the primary copy in the mirror pair
1991 * The "Wsd" node writes data to the secondary copy in the mirror pair
1992 *
1993 * Parameters: raidPtr - description of the physical array
1994 * asmap - logical & physical addresses for this access
1995 * bp - buffer ptr (holds write data)
1996 * flags - general flags (e.g. disk locking)
1997 * allocList - list of memory allocated in DAG creation
1998 *****************************************************************************/
1999
2000 void
2001 rf_CreateRaidOneWriteDAGFwd(
2002 RF_Raid_t * raidPtr,
2003 RF_AccessStripeMap_t * asmap,
2004 RF_DagHeader_t * dag_h,
2005 void *bp,
2006 RF_RaidAccessFlags_t flags,
2007 RF_AllocListElem_t * allocList)
2008 {
2009 RF_DagNode_t *blockNode, *unblockNode, *termNode;
2010 RF_DagNode_t *nodes, *wndNode, *wmirNode;
2011 int nWndNodes, nWmirNodes, i;
2012 RF_ReconUnitNum_t which_ru;
2013 RF_PhysDiskAddr_t *pda, *pdaP;
2014 RF_StripeNum_t parityStripeID;
2015
2016 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2017 asmap->raidAddress, &which_ru);
2018 if (rf_dagDebug) {
2019 printf("[Creating RAID level 1 write DAG]\n");
2020 }
2021 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; /* 2 implies access not
2022 * SU aligned */
2023 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2024
2025 /* alloc the Wnd nodes and the Wmir node */
2026 if (asmap->numDataFailed == 1)
2027 nWndNodes--;
2028 if (asmap->numParityFailed == 1)
2029 nWmirNodes--;
2030
2031 /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock +
2032 * terminator) */
2033 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2034 i = 0;
2035 wndNode = &nodes[i];
2036 i += nWndNodes;
2037 wmirNode = &nodes[i];
2038 i += nWmirNodes;
2039 blockNode = &nodes[i];
2040 i += 1;
2041 unblockNode = &nodes[i];
2042 i += 1;
2043 termNode = &nodes[i];
2044 i += 1;
2045 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2046
2047 /* this dag can commit immediately */
2048 dag_h->numCommitNodes = 0;
2049 dag_h->numCommits = 0;
2050 dag_h->numSuccedents = 1;
2051
2052 /* initialize the unblock and term nodes */
2053 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList);
2054 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
2055 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2056
2057 /* initialize the wnd nodes */
2058 if (nWndNodes > 0) {
2059 pda = asmap->physInfo;
2060 for (i = 0; i < nWndNodes; i++) {
2061 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
2062 RF_ASSERT(pda != NULL);
2063 wndNode[i].params[0].p = pda;
2064 wndNode[i].params[1].p = pda->bufPtr;
2065 wndNode[i].params[2].v = parityStripeID;
2066 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2067 pda = pda->next;
2068 }
2069 RF_ASSERT(pda == NULL);
2070 }
2071 /* initialize the mirror nodes */
2072 if (nWmirNodes > 0) {
2073 pda = asmap->physInfo;
2074 pdaP = asmap->parityInfo;
2075 for (i = 0; i < nWmirNodes; i++) {
2076 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
2077 RF_ASSERT(pda != NULL);
2078 wmirNode[i].params[0].p = pdaP;
2079 wmirNode[i].params[1].p = pda->bufPtr;
2080 wmirNode[i].params[2].v = parityStripeID;
2081 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2082 pda = pda->next;
2083 pdaP = pdaP->next;
2084 }
2085 RF_ASSERT(pda == NULL);
2086 RF_ASSERT(pdaP == NULL);
2087 }
2088 /* link the header node to the block node */
2089 RF_ASSERT(dag_h->numSuccedents == 1);
2090 RF_ASSERT(blockNode->numAntecedents == 0);
2091 dag_h->succedents[0] = blockNode;
2092
2093 /* link the block node to the write nodes */
2094 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2095 for (i = 0; i < nWndNodes; i++) {
2096 RF_ASSERT(wndNode[i].numAntecedents == 1);
2097 blockNode->succedents[i] = &wndNode[i];
2098 wndNode[i].antecedents[0] = blockNode;
2099 wndNode[i].antType[0] = rf_control;
2100 }
2101 for (i = 0; i < nWmirNodes; i++) {
2102 RF_ASSERT(wmirNode[i].numAntecedents == 1);
2103 blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2104 wmirNode[i].antecedents[0] = blockNode;
2105 wmirNode[i].antType[0] = rf_control;
2106 }
2107
2108 /* link the write nodes to the unblock node */
2109 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2110 for (i = 0; i < nWndNodes; i++) {
2111 RF_ASSERT(wndNode[i].numSuccedents == 1);
2112 wndNode[i].succedents[0] = unblockNode;
2113 unblockNode->antecedents[i] = &wndNode[i];
2114 unblockNode->antType[i] = rf_control;
2115 }
2116 for (i = 0; i < nWmirNodes; i++) {
2117 RF_ASSERT(wmirNode[i].numSuccedents == 1);
2118 wmirNode[i].succedents[0] = unblockNode;
2119 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2120 unblockNode->antType[i + nWndNodes] = rf_control;
2121 }
2122
2123 /* link the unblock node to the term node */
2124 RF_ASSERT(unblockNode->numSuccedents == 1);
2125 RF_ASSERT(termNode->numAntecedents == 1);
2126 RF_ASSERT(termNode->numSuccedents == 0);
2127 unblockNode->succedents[0] = termNode;
2128 termNode->antecedents[0] = unblockNode;
2129 termNode->antType[0] = rf_control;
2130
2131 return;
2132 }
2133