rf_dagffwr.c revision 1.5.6.1 1 /* $NetBSD: rf_dagffwr.c,v 1.5.6.1 2001/10/22 20:41:33 nathanw Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /*
30 * rf_dagff.c
31 *
32 * code for creating fault-free DAGs
33 *
34 */
35
36 #include <dev/raidframe/raidframevar.h>
37
38 #include "rf_raid.h"
39 #include "rf_dag.h"
40 #include "rf_dagutils.h"
41 #include "rf_dagfuncs.h"
42 #include "rf_debugMem.h"
43 #include "rf_dagffrd.h"
44 #include "rf_memchunk.h"
45 #include "rf_general.h"
46 #include "rf_dagffwr.h"
47
48 /******************************************************************************
49 *
50 * General comments on DAG creation:
51 *
52 * All DAGs in this file use roll-away error recovery. Each DAG has a single
53 * commit node, usually called "Cmt." If an error occurs before the Cmt node
54 * is reached, the execution engine will halt forward execution and work
55 * backward through the graph, executing the undo functions. Assuming that
56 * each node in the graph prior to the Cmt node are undoable and atomic - or -
57 * does not make changes to permanent state, the graph will fail atomically.
58 * If an error occurs after the Cmt node executes, the engine will roll-forward
59 * through the graph, blindly executing nodes until it reaches the end.
60 * If a graph reaches the end, it is assumed to have completed successfully.
61 *
62 * A graph has only 1 Cmt node.
63 *
64 */
65
66
67 /******************************************************************************
68 *
69 * The following wrappers map the standard DAG creation interface to the
70 * DAG creation routines. Additionally, these wrappers enable experimentation
71 * with new DAG structures by providing an extra level of indirection, allowing
72 * the DAG creation routines to be replaced at this single point.
73 */
74
75
76 void
77 rf_CreateNonRedundantWriteDAG(
78 RF_Raid_t * raidPtr,
79 RF_AccessStripeMap_t * asmap,
80 RF_DagHeader_t * dag_h,
81 void *bp,
82 RF_RaidAccessFlags_t flags,
83 RF_AllocListElem_t * allocList,
84 RF_IoType_t type)
85 {
86 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
87 RF_IO_TYPE_WRITE);
88 }
89
90 void
91 rf_CreateRAID0WriteDAG(
92 RF_Raid_t * raidPtr,
93 RF_AccessStripeMap_t * asmap,
94 RF_DagHeader_t * dag_h,
95 void *bp,
96 RF_RaidAccessFlags_t flags,
97 RF_AllocListElem_t * allocList,
98 RF_IoType_t type)
99 {
100 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
101 RF_IO_TYPE_WRITE);
102 }
103
104 void
105 rf_CreateSmallWriteDAG(
106 RF_Raid_t * raidPtr,
107 RF_AccessStripeMap_t * asmap,
108 RF_DagHeader_t * dag_h,
109 void *bp,
110 RF_RaidAccessFlags_t flags,
111 RF_AllocListElem_t * allocList)
112 {
113 /* "normal" rollaway */
114 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
115 &rf_xorFuncs, NULL);
116 }
117
118 void
119 rf_CreateLargeWriteDAG(
120 RF_Raid_t * raidPtr,
121 RF_AccessStripeMap_t * asmap,
122 RF_DagHeader_t * dag_h,
123 void *bp,
124 RF_RaidAccessFlags_t flags,
125 RF_AllocListElem_t * allocList)
126 {
127 /* "normal" rollaway */
128 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
129 1, rf_RegularXorFunc, RF_TRUE);
130 }
131
132
133 /******************************************************************************
134 *
135 * DAG creation code begins here
136 */
137
138
139 /******************************************************************************
140 *
141 * creates a DAG to perform a large-write operation:
142 *
143 * / Rod \ / Wnd \
144 * H -- block- Rod - Xor - Cmt - Wnd --- T
145 * \ Rod / \ Wnp /
146 * \[Wnq]/
147 *
148 * The XOR node also does the Q calculation in the P+Q architecture.
149 * All nodes are before the commit node (Cmt) are assumed to be atomic and
150 * undoable - or - they make no changes to permanent state.
151 *
152 * Rod = read old data
153 * Cmt = commit node
154 * Wnp = write new parity
155 * Wnd = write new data
156 * Wnq = write new "q"
157 * [] denotes optional segments in the graph
158 *
159 * Parameters: raidPtr - description of the physical array
160 * asmap - logical & physical addresses for this access
161 * bp - buffer ptr (holds write data)
162 * flags - general flags (e.g. disk locking)
163 * allocList - list of memory allocated in DAG creation
164 * nfaults - number of faults array can tolerate
165 * (equal to # redundancy units in stripe)
166 * redfuncs - list of redundancy generating functions
167 *
168 *****************************************************************************/
169
170 void
171 rf_CommonCreateLargeWriteDAG(
172 RF_Raid_t * raidPtr,
173 RF_AccessStripeMap_t * asmap,
174 RF_DagHeader_t * dag_h,
175 void *bp,
176 RF_RaidAccessFlags_t flags,
177 RF_AllocListElem_t * allocList,
178 int nfaults,
179 int (*redFunc) (RF_DagNode_t *),
180 int allowBufferRecycle)
181 {
182 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
183 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
184 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
185 RF_AccessStripeMapHeader_t *new_asm_h[2];
186 RF_StripeNum_t parityStripeID;
187 char *sosBuffer, *eosBuffer;
188 RF_ReconUnitNum_t which_ru;
189 RF_RaidLayout_t *layoutPtr;
190 RF_PhysDiskAddr_t *pda;
191
192 layoutPtr = &(raidPtr->Layout);
193 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress,
194 &which_ru);
195
196 if (rf_dagDebug) {
197 printf("[Creating large-write DAG]\n");
198 }
199 dag_h->creator = "LargeWriteDAG";
200
201 dag_h->numCommitNodes = 1;
202 dag_h->numCommits = 0;
203 dag_h->numSuccedents = 1;
204
205 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
206 nWndNodes = asmap->numStripeUnitsAccessed;
207 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
208 (RF_DagNode_t *), allocList);
209 i = 0;
210 wndNodes = &nodes[i];
211 i += nWndNodes;
212 xorNode = &nodes[i];
213 i += 1;
214 wnpNode = &nodes[i];
215 i += 1;
216 blockNode = &nodes[i];
217 i += 1;
218 commitNode = &nodes[i];
219 i += 1;
220 termNode = &nodes[i];
221 i += 1;
222 if (nfaults == 2) {
223 wnqNode = &nodes[i];
224 i += 1;
225 } else {
226 wnqNode = NULL;
227 }
228 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h,
229 &nRodNodes, &sosBuffer, &eosBuffer, allocList);
230 if (nRodNodes > 0) {
231 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
232 (RF_DagNode_t *), allocList);
233 } else {
234 rodNodes = NULL;
235 }
236
237 /* begin node initialization */
238 if (nRodNodes > 0) {
239 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
240 NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
241 } else {
242 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
243 NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
244 }
245
246 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL,
247 nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList);
248 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL,
249 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
250
251 /* initialize the Rod nodes */
252 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
253 if (new_asm_h[asmNum]) {
254 pda = new_asm_h[asmNum]->stripeMap->physInfo;
255 while (pda) {
256 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,
257 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
258 "Rod", allocList);
259 rodNodes[nodeNum].params[0].p = pda;
260 rodNodes[nodeNum].params[1].p = pda->bufPtr;
261 rodNodes[nodeNum].params[2].v = parityStripeID;
262 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
263 0, 0, which_ru);
264 nodeNum++;
265 pda = pda->next;
266 }
267 }
268 }
269 RF_ASSERT(nodeNum == nRodNodes);
270
271 /* initialize the wnd nodes */
272 pda = asmap->physInfo;
273 for (i = 0; i < nWndNodes; i++) {
274 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
275 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
276 RF_ASSERT(pda != NULL);
277 wndNodes[i].params[0].p = pda;
278 wndNodes[i].params[1].p = pda->bufPtr;
279 wndNodes[i].params[2].v = parityStripeID;
280 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
281 pda = pda->next;
282 }
283
284 /* initialize the redundancy node */
285 if (nRodNodes > 0) {
286 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
287 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
288 "Xr ", allocList);
289 } else {
290 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
291 1, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
292 }
293 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
294 for (i = 0; i < nWndNodes; i++) {
295 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */
296 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */
297 }
298 for (i = 0; i < nRodNodes; i++) {
299 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */
300 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */
301 }
302 /* xor node needs to get at RAID information */
303 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
304
305 /*
306 * Look for an Rod node that reads a complete SU. If none, alloc a buffer
307 * to receive the parity info. Note that we can't use a new data buffer
308 * because it will not have gotten written when the xor occurs.
309 */
310 if (allowBufferRecycle) {
311 for (i = 0; i < nRodNodes; i++) {
312 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
313 break;
314 }
315 }
316 if ((!allowBufferRecycle) || (i == nRodNodes)) {
317 RF_CallocAndAdd(xorNode->results[0], 1,
318 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
319 (void *), allocList);
320 } else {
321 xorNode->results[0] = rodNodes[i].params[1].p;
322 }
323
324 /* initialize the Wnp node */
325 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
326 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
327 wnpNode->params[0].p = asmap->parityInfo;
328 wnpNode->params[1].p = xorNode->results[0];
329 wnpNode->params[2].v = parityStripeID;
330 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
331 /* parityInfo must describe entire parity unit */
332 RF_ASSERT(asmap->parityInfo->next == NULL);
333
334 if (nfaults == 2) {
335 /*
336 * We never try to recycle a buffer for the Q calcuation
337 * in addition to the parity. This would cause two buffers
338 * to get smashed during the P and Q calculation, guaranteeing
339 * one would be wrong.
340 */
341 RF_CallocAndAdd(xorNode->results[1], 1,
342 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
343 (void *), allocList);
344 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
345 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
346 wnqNode->params[0].p = asmap->qInfo;
347 wnqNode->params[1].p = xorNode->results[1];
348 wnqNode->params[2].v = parityStripeID;
349 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
350 /* parityInfo must describe entire parity unit */
351 RF_ASSERT(asmap->parityInfo->next == NULL);
352 }
353 /*
354 * Connect nodes to form graph.
355 */
356
357 /* connect dag header to block node */
358 RF_ASSERT(blockNode->numAntecedents == 0);
359 dag_h->succedents[0] = blockNode;
360
361 if (nRodNodes > 0) {
362 /* connect the block node to the Rod nodes */
363 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
364 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
365 for (i = 0; i < nRodNodes; i++) {
366 RF_ASSERT(rodNodes[i].numAntecedents == 1);
367 blockNode->succedents[i] = &rodNodes[i];
368 rodNodes[i].antecedents[0] = blockNode;
369 rodNodes[i].antType[0] = rf_control;
370
371 /* connect the Rod nodes to the Xor node */
372 RF_ASSERT(rodNodes[i].numSuccedents == 1);
373 rodNodes[i].succedents[0] = xorNode;
374 xorNode->antecedents[i] = &rodNodes[i];
375 xorNode->antType[i] = rf_trueData;
376 }
377 } else {
378 /* connect the block node to the Xor node */
379 RF_ASSERT(blockNode->numSuccedents == 1);
380 RF_ASSERT(xorNode->numAntecedents == 1);
381 blockNode->succedents[0] = xorNode;
382 xorNode->antecedents[0] = blockNode;
383 xorNode->antType[0] = rf_control;
384 }
385
386 /* connect the xor node to the commit node */
387 RF_ASSERT(xorNode->numSuccedents == 1);
388 RF_ASSERT(commitNode->numAntecedents == 1);
389 xorNode->succedents[0] = commitNode;
390 commitNode->antecedents[0] = xorNode;
391 commitNode->antType[0] = rf_control;
392
393 /* connect the commit node to the write nodes */
394 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
395 for (i = 0; i < nWndNodes; i++) {
396 RF_ASSERT(wndNodes->numAntecedents == 1);
397 commitNode->succedents[i] = &wndNodes[i];
398 wndNodes[i].antecedents[0] = commitNode;
399 wndNodes[i].antType[0] = rf_control;
400 }
401 RF_ASSERT(wnpNode->numAntecedents == 1);
402 commitNode->succedents[nWndNodes] = wnpNode;
403 wnpNode->antecedents[0] = commitNode;
404 wnpNode->antType[0] = rf_trueData;
405 if (nfaults == 2) {
406 RF_ASSERT(wnqNode->numAntecedents == 1);
407 commitNode->succedents[nWndNodes + 1] = wnqNode;
408 wnqNode->antecedents[0] = commitNode;
409 wnqNode->antType[0] = rf_trueData;
410 }
411 /* connect the write nodes to the term node */
412 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
413 RF_ASSERT(termNode->numSuccedents == 0);
414 for (i = 0; i < nWndNodes; i++) {
415 RF_ASSERT(wndNodes->numSuccedents == 1);
416 wndNodes[i].succedents[0] = termNode;
417 termNode->antecedents[i] = &wndNodes[i];
418 termNode->antType[i] = rf_control;
419 }
420 RF_ASSERT(wnpNode->numSuccedents == 1);
421 wnpNode->succedents[0] = termNode;
422 termNode->antecedents[nWndNodes] = wnpNode;
423 termNode->antType[nWndNodes] = rf_control;
424 if (nfaults == 2) {
425 RF_ASSERT(wnqNode->numSuccedents == 1);
426 wnqNode->succedents[0] = termNode;
427 termNode->antecedents[nWndNodes + 1] = wnqNode;
428 termNode->antType[nWndNodes + 1] = rf_control;
429 }
430 }
431 /******************************************************************************
432 *
433 * creates a DAG to perform a small-write operation (either raid 5 or pq),
434 * which is as follows:
435 *
436 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
437 * \- Rod X / \----> Wnd [Und]-/
438 * [\- Rod X / \---> Wnd [Und]-/]
439 * [\- Roq -> Q / \--> Wnq [Unq]-/]
440 *
441 * Rop = read old parity
442 * Rod = read old data
443 * Roq = read old "q"
444 * Cmt = commit node
445 * Und = unlock data disk
446 * Unp = unlock parity disk
447 * Unq = unlock q disk
448 * Wnp = write new parity
449 * Wnd = write new data
450 * Wnq = write new "q"
451 * [ ] denotes optional segments in the graph
452 *
453 * Parameters: raidPtr - description of the physical array
454 * asmap - logical & physical addresses for this access
455 * bp - buffer ptr (holds write data)
456 * flags - general flags (e.g. disk locking)
457 * allocList - list of memory allocated in DAG creation
458 * pfuncs - list of parity generating functions
459 * qfuncs - list of q generating functions
460 *
461 * A null qfuncs indicates single fault tolerant
462 *****************************************************************************/
463
464 void
465 rf_CommonCreateSmallWriteDAG(
466 RF_Raid_t * raidPtr,
467 RF_AccessStripeMap_t * asmap,
468 RF_DagHeader_t * dag_h,
469 void *bp,
470 RF_RaidAccessFlags_t flags,
471 RF_AllocListElem_t * allocList,
472 RF_RedFuncs_t * pfuncs,
473 RF_RedFuncs_t * qfuncs)
474 {
475 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
476 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
477 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
478 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
479 int i, j, nNodes, totalNumNodes, lu_flag;
480 RF_ReconUnitNum_t which_ru;
481 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
482 int (*qfunc) (RF_DagNode_t *);
483 int numDataNodes, numParityNodes;
484 RF_StripeNum_t parityStripeID;
485 RF_PhysDiskAddr_t *pda;
486 char *name, *qname;
487 long nfaults;
488
489 nfaults = qfuncs ? 2 : 1;
490 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
491
492 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
493 asmap->raidAddress, &which_ru);
494 pda = asmap->physInfo;
495 numDataNodes = asmap->numStripeUnitsAccessed;
496 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
497
498 if (rf_dagDebug) {
499 printf("[Creating small-write DAG]\n");
500 }
501 RF_ASSERT(numDataNodes > 0);
502 dag_h->creator = "SmallWriteDAG";
503
504 dag_h->numCommitNodes = 1;
505 dag_h->numCommits = 0;
506 dag_h->numSuccedents = 1;
507
508 /*
509 * DAG creation occurs in four steps:
510 * 1. count the number of nodes in the DAG
511 * 2. create the nodes
512 * 3. initialize the nodes
513 * 4. connect the nodes
514 */
515
516 /*
517 * Step 1. compute number of nodes in the graph
518 */
519
520 /* number of nodes: a read and write for each data unit a redundancy
521 * computation node for each parity node (nfaults * nparity) a read
522 * and write for each parity unit a block and commit node (2) a
523 * terminate node if atomic RMW an unlock node for each data unit,
524 * redundancy unit */
525 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
526 + (nfaults * 2 * numParityNodes) + 3;
527 if (lu_flag) {
528 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
529 }
530 /*
531 * Step 2. create the nodes
532 */
533 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
534 (RF_DagNode_t *), allocList);
535 i = 0;
536 blockNode = &nodes[i];
537 i += 1;
538 commitNode = &nodes[i];
539 i += 1;
540 readDataNodes = &nodes[i];
541 i += numDataNodes;
542 readParityNodes = &nodes[i];
543 i += numParityNodes;
544 writeDataNodes = &nodes[i];
545 i += numDataNodes;
546 writeParityNodes = &nodes[i];
547 i += numParityNodes;
548 xorNodes = &nodes[i];
549 i += numParityNodes;
550 termNode = &nodes[i];
551 i += 1;
552 if (lu_flag) {
553 unlockDataNodes = &nodes[i];
554 i += numDataNodes;
555 unlockParityNodes = &nodes[i];
556 i += numParityNodes;
557 } else {
558 unlockDataNodes = unlockParityNodes = NULL;
559 }
560 if (nfaults == 2) {
561 readQNodes = &nodes[i];
562 i += numParityNodes;
563 writeQNodes = &nodes[i];
564 i += numParityNodes;
565 qNodes = &nodes[i];
566 i += numParityNodes;
567 if (lu_flag) {
568 unlockQNodes = &nodes[i];
569 i += numParityNodes;
570 } else {
571 unlockQNodes = NULL;
572 }
573 } else {
574 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
575 }
576 RF_ASSERT(i == totalNumNodes);
577
578 /*
579 * Step 3. initialize the nodes
580 */
581 /* initialize block node (Nil) */
582 nNodes = numDataNodes + (nfaults * numParityNodes);
583 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
584 NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
585
586 /* initialize commit node (Cmt) */
587 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
588 NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
589
590 /* initialize terminate node (Trm) */
591 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
592 NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
593
594 /* initialize nodes which read old data (Rod) */
595 for (i = 0; i < numDataNodes; i++) {
596 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
597 rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h,
598 "Rod", allocList);
599 RF_ASSERT(pda != NULL);
600 /* physical disk addr desc */
601 readDataNodes[i].params[0].p = pda;
602 /* buffer to hold old data */
603 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
604 dag_h, pda, allocList);
605 readDataNodes[i].params[2].v = parityStripeID;
606 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
607 lu_flag, 0, which_ru);
608 pda = pda->next;
609 for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
610 readDataNodes[i].propList[j] = NULL;
611 }
612 }
613
614 /* initialize nodes which read old parity (Rop) */
615 pda = asmap->parityInfo;
616 i = 0;
617 for (i = 0; i < numParityNodes; i++) {
618 RF_ASSERT(pda != NULL);
619 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc,
620 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4,
621 0, dag_h, "Rop", allocList);
622 readParityNodes[i].params[0].p = pda;
623 /* buffer to hold old parity */
624 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
625 dag_h, pda, allocList);
626 readParityNodes[i].params[2].v = parityStripeID;
627 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
628 lu_flag, 0, which_ru);
629 pda = pda->next;
630 for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
631 readParityNodes[i].propList[0] = NULL;
632 }
633 }
634
635 /* initialize nodes which read old Q (Roq) */
636 if (nfaults == 2) {
637 pda = asmap->qInfo;
638 for (i = 0; i < numParityNodes; i++) {
639 RF_ASSERT(pda != NULL);
640 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
641 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
642 readQNodes[i].params[0].p = pda;
643 /* buffer to hold old Q */
644 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda,
645 allocList);
646 readQNodes[i].params[2].v = parityStripeID;
647 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
648 lu_flag, 0, which_ru);
649 pda = pda->next;
650 for (j = 0; j < readQNodes[i].numSuccedents; j++) {
651 readQNodes[i].propList[0] = NULL;
652 }
653 }
654 }
655 /* initialize nodes which write new data (Wnd) */
656 pda = asmap->physInfo;
657 for (i = 0; i < numDataNodes; i++) {
658 RF_ASSERT(pda != NULL);
659 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
660 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
661 "Wnd", allocList);
662 /* physical disk addr desc */
663 writeDataNodes[i].params[0].p = pda;
664 /* buffer holding new data to be written */
665 writeDataNodes[i].params[1].p = pda->bufPtr;
666 writeDataNodes[i].params[2].v = parityStripeID;
667 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
668 0, 0, which_ru);
669 if (lu_flag) {
670 /* initialize node to unlock the disk queue */
671 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
672 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
673 "Und", allocList);
674 /* physical disk addr desc */
675 unlockDataNodes[i].params[0].p = pda;
676 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
677 0, lu_flag, which_ru);
678 }
679 pda = pda->next;
680 }
681
682 /*
683 * Initialize nodes which compute new parity and Q.
684 */
685 /*
686 * We use the simple XOR func in the double-XOR case, and when
687 * we're accessing only a portion of one stripe unit. The distinction
688 * between the two is that the regular XOR func assumes that the targbuf
689 * is a full SU in size, and examines the pda associated with the buffer
690 * to decide where within the buffer to XOR the data, whereas
691 * the simple XOR func just XORs the data into the start of the buffer.
692 */
693 if ((numParityNodes == 2) || ((numDataNodes == 1)
694 && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
695 func = pfuncs->simple;
696 undoFunc = rf_NullNodeUndoFunc;
697 name = pfuncs->SimpleName;
698 if (qfuncs) {
699 qfunc = qfuncs->simple;
700 qname = qfuncs->SimpleName;
701 } else {
702 qfunc = NULL;
703 qname = NULL;
704 }
705 } else {
706 func = pfuncs->regular;
707 undoFunc = rf_NullNodeUndoFunc;
708 name = pfuncs->RegularName;
709 if (qfuncs) {
710 qfunc = qfuncs->regular;
711 qname = qfuncs->RegularName;
712 } else {
713 qfunc = NULL;
714 qname = NULL;
715 }
716 }
717 /*
718 * Initialize the xor nodes: params are {pda,buf}
719 * from {Rod,Wnd,Rop} nodes, and raidPtr
720 */
721 if (numParityNodes == 2) {
722 /* double-xor case */
723 for (i = 0; i < numParityNodes; i++) {
724 /* note: no wakeup func for xor */
725 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL,
726 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList);
727 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
728 xorNodes[i].params[0] = readDataNodes[i].params[0];
729 xorNodes[i].params[1] = readDataNodes[i].params[1];
730 xorNodes[i].params[2] = readParityNodes[i].params[0];
731 xorNodes[i].params[3] = readParityNodes[i].params[1];
732 xorNodes[i].params[4] = writeDataNodes[i].params[0];
733 xorNodes[i].params[5] = writeDataNodes[i].params[1];
734 xorNodes[i].params[6].p = raidPtr;
735 /* use old parity buf as target buf */
736 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
737 if (nfaults == 2) {
738 /* note: no wakeup func for qor */
739 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
740 (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList);
741 qNodes[i].params[0] = readDataNodes[i].params[0];
742 qNodes[i].params[1] = readDataNodes[i].params[1];
743 qNodes[i].params[2] = readQNodes[i].params[0];
744 qNodes[i].params[3] = readQNodes[i].params[1];
745 qNodes[i].params[4] = writeDataNodes[i].params[0];
746 qNodes[i].params[5] = writeDataNodes[i].params[1];
747 qNodes[i].params[6].p = raidPtr;
748 /* use old Q buf as target buf */
749 qNodes[i].results[0] = readQNodes[i].params[1].p;
750 }
751 }
752 } else {
753 /* there is only one xor node in this case */
754 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1,
755 (numDataNodes + numParityNodes),
756 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
757 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
758 for (i = 0; i < numDataNodes + 1; i++) {
759 /* set up params related to Rod and Rop nodes */
760 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
761 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
762 }
763 for (i = 0; i < numDataNodes; i++) {
764 /* set up params related to Wnd and Wnp nodes */
765 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
766 writeDataNodes[i].params[0];
767 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
768 writeDataNodes[i].params[1];
769 }
770 /* xor node needs to get at RAID information */
771 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
772 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
773 if (nfaults == 2) {
774 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
775 (numDataNodes + numParityNodes),
776 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
777 qname, allocList);
778 for (i = 0; i < numDataNodes; i++) {
779 /* set up params related to Rod */
780 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
781 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
782 }
783 /* and read old q */
784 qNodes[0].params[2 * numDataNodes + 0] = /* pda */
785 readQNodes[0].params[0];
786 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */
787 readQNodes[0].params[1];
788 for (i = 0; i < numDataNodes; i++) {
789 /* set up params related to Wnd nodes */
790 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
791 writeDataNodes[i].params[0];
792 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
793 writeDataNodes[i].params[1];
794 }
795 /* xor node needs to get at RAID information */
796 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
797 qNodes[0].results[0] = readQNodes[0].params[1].p;
798 }
799 }
800
801 /* initialize nodes which write new parity (Wnp) */
802 pda = asmap->parityInfo;
803 for (i = 0; i < numParityNodes; i++) {
804 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
805 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
806 "Wnp", allocList);
807 RF_ASSERT(pda != NULL);
808 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
809 * filled in by xor node */
810 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
811 * parity write
812 * operation */
813 writeParityNodes[i].params[2].v = parityStripeID;
814 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
815 0, 0, which_ru);
816 if (lu_flag) {
817 /* initialize node to unlock the disk queue */
818 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
819 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
820 "Unp", allocList);
821 unlockParityNodes[i].params[0].p = pda; /* physical disk addr
822 * desc */
823 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
824 0, lu_flag, which_ru);
825 }
826 pda = pda->next;
827 }
828
829 /* initialize nodes which write new Q (Wnq) */
830 if (nfaults == 2) {
831 pda = asmap->qInfo;
832 for (i = 0; i < numParityNodes; i++) {
833 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
834 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
835 "Wnq", allocList);
836 RF_ASSERT(pda != NULL);
837 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
838 * filled in by xor node */
839 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
840 * parity write
841 * operation */
842 writeQNodes[i].params[2].v = parityStripeID;
843 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
844 0, 0, which_ru);
845 if (lu_flag) {
846 /* initialize node to unlock the disk queue */
847 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
848 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
849 "Unq", allocList);
850 unlockQNodes[i].params[0].p = pda; /* physical disk addr
851 * desc */
852 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
853 0, lu_flag, which_ru);
854 }
855 pda = pda->next;
856 }
857 }
858 /*
859 * Step 4. connect the nodes.
860 */
861
862 /* connect header to block node */
863 dag_h->succedents[0] = blockNode;
864
865 /* connect block node to read old data nodes */
866 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
867 for (i = 0; i < numDataNodes; i++) {
868 blockNode->succedents[i] = &readDataNodes[i];
869 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
870 readDataNodes[i].antecedents[0] = blockNode;
871 readDataNodes[i].antType[0] = rf_control;
872 }
873
874 /* connect block node to read old parity nodes */
875 for (i = 0; i < numParityNodes; i++) {
876 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
877 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
878 readParityNodes[i].antecedents[0] = blockNode;
879 readParityNodes[i].antType[0] = rf_control;
880 }
881
882 /* connect block node to read old Q nodes */
883 if (nfaults == 2) {
884 for (i = 0; i < numParityNodes; i++) {
885 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
886 RF_ASSERT(readQNodes[i].numAntecedents == 1);
887 readQNodes[i].antecedents[0] = blockNode;
888 readQNodes[i].antType[0] = rf_control;
889 }
890 }
891 /* connect read old data nodes to xor nodes */
892 for (i = 0; i < numDataNodes; i++) {
893 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
894 for (j = 0; j < numParityNodes; j++) {
895 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
896 readDataNodes[i].succedents[j] = &xorNodes[j];
897 xorNodes[j].antecedents[i] = &readDataNodes[i];
898 xorNodes[j].antType[i] = rf_trueData;
899 }
900 }
901
902 /* connect read old data nodes to q nodes */
903 if (nfaults == 2) {
904 for (i = 0; i < numDataNodes; i++) {
905 for (j = 0; j < numParityNodes; j++) {
906 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
907 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
908 qNodes[j].antecedents[i] = &readDataNodes[i];
909 qNodes[j].antType[i] = rf_trueData;
910 }
911 }
912 }
913 /* connect read old parity nodes to xor nodes */
914 for (i = 0; i < numParityNodes; i++) {
915 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
916 for (j = 0; j < numParityNodes; j++) {
917 readParityNodes[i].succedents[j] = &xorNodes[j];
918 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
919 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
920 }
921 }
922
923 /* connect read old q nodes to q nodes */
924 if (nfaults == 2) {
925 for (i = 0; i < numParityNodes; i++) {
926 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
927 for (j = 0; j < numParityNodes; j++) {
928 readQNodes[i].succedents[j] = &qNodes[j];
929 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
930 qNodes[j].antType[numDataNodes + i] = rf_trueData;
931 }
932 }
933 }
934 /* connect xor nodes to commit node */
935 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
936 for (i = 0; i < numParityNodes; i++) {
937 RF_ASSERT(xorNodes[i].numSuccedents == 1);
938 xorNodes[i].succedents[0] = commitNode;
939 commitNode->antecedents[i] = &xorNodes[i];
940 commitNode->antType[i] = rf_control;
941 }
942
943 /* connect q nodes to commit node */
944 if (nfaults == 2) {
945 for (i = 0; i < numParityNodes; i++) {
946 RF_ASSERT(qNodes[i].numSuccedents == 1);
947 qNodes[i].succedents[0] = commitNode;
948 commitNode->antecedents[i + numParityNodes] = &qNodes[i];
949 commitNode->antType[i + numParityNodes] = rf_control;
950 }
951 }
952 /* connect commit node to write nodes */
953 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
954 for (i = 0; i < numDataNodes; i++) {
955 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
956 commitNode->succedents[i] = &writeDataNodes[i];
957 writeDataNodes[i].antecedents[0] = commitNode;
958 writeDataNodes[i].antType[0] = rf_trueData;
959 }
960 for (i = 0; i < numParityNodes; i++) {
961 RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
962 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
963 writeParityNodes[i].antecedents[0] = commitNode;
964 writeParityNodes[i].antType[0] = rf_trueData;
965 }
966 if (nfaults == 2) {
967 for (i = 0; i < numParityNodes; i++) {
968 RF_ASSERT(writeQNodes[i].numAntecedents == 1);
969 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
970 writeQNodes[i].antecedents[0] = commitNode;
971 writeQNodes[i].antType[0] = rf_trueData;
972 }
973 }
974 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
975 RF_ASSERT(termNode->numSuccedents == 0);
976 for (i = 0; i < numDataNodes; i++) {
977 if (lu_flag) {
978 /* connect write new data nodes to unlock nodes */
979 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
980 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
981 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
982 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
983 unlockDataNodes[i].antType[0] = rf_control;
984
985 /* connect unlock nodes to term node */
986 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
987 unlockDataNodes[i].succedents[0] = termNode;
988 termNode->antecedents[i] = &unlockDataNodes[i];
989 termNode->antType[i] = rf_control;
990 } else {
991 /* connect write new data nodes to term node */
992 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
993 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
994 writeDataNodes[i].succedents[0] = termNode;
995 termNode->antecedents[i] = &writeDataNodes[i];
996 termNode->antType[i] = rf_control;
997 }
998 }
999
1000 for (i = 0; i < numParityNodes; i++) {
1001 if (lu_flag) {
1002 /* connect write new parity nodes to unlock nodes */
1003 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1004 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1005 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1006 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1007 unlockParityNodes[i].antType[0] = rf_control;
1008
1009 /* connect unlock nodes to term node */
1010 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1011 unlockParityNodes[i].succedents[0] = termNode;
1012 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1013 termNode->antType[numDataNodes + i] = rf_control;
1014 } else {
1015 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1016 writeParityNodes[i].succedents[0] = termNode;
1017 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1018 termNode->antType[numDataNodes + i] = rf_control;
1019 }
1020 }
1021
1022 if (nfaults == 2) {
1023 for (i = 0; i < numParityNodes; i++) {
1024 if (lu_flag) {
1025 /* connect write new Q nodes to unlock nodes */
1026 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1027 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1028 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1029 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1030 unlockQNodes[i].antType[0] = rf_control;
1031
1032 /* connect unlock nodes to unblock node */
1033 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1034 unlockQNodes[i].succedents[0] = termNode;
1035 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1036 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1037 } else {
1038 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1039 writeQNodes[i].succedents[0] = termNode;
1040 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1041 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1042 }
1043 }
1044 }
1045 }
1046
1047
1048 /******************************************************************************
1049 * create a write graph (fault-free or degraded) for RAID level 1
1050 *
1051 * Hdr -> Commit -> Wpd -> Nil -> Trm
1052 * -> Wsd ->
1053 *
1054 * The "Wpd" node writes data to the primary copy in the mirror pair
1055 * The "Wsd" node writes data to the secondary copy in the mirror pair
1056 *
1057 * Parameters: raidPtr - description of the physical array
1058 * asmap - logical & physical addresses for this access
1059 * bp - buffer ptr (holds write data)
1060 * flags - general flags (e.g. disk locking)
1061 * allocList - list of memory allocated in DAG creation
1062 *****************************************************************************/
1063
1064 void
1065 rf_CreateRaidOneWriteDAG(
1066 RF_Raid_t * raidPtr,
1067 RF_AccessStripeMap_t * asmap,
1068 RF_DagHeader_t * dag_h,
1069 void *bp,
1070 RF_RaidAccessFlags_t flags,
1071 RF_AllocListElem_t * allocList)
1072 {
1073 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1074 RF_DagNode_t *nodes, *wndNode, *wmirNode;
1075 int nWndNodes, nWmirNodes, i;
1076 RF_ReconUnitNum_t which_ru;
1077 RF_PhysDiskAddr_t *pda, *pdaP;
1078 RF_StripeNum_t parityStripeID;
1079
1080 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1081 asmap->raidAddress, &which_ru);
1082 if (rf_dagDebug) {
1083 printf("[Creating RAID level 1 write DAG]\n");
1084 }
1085 dag_h->creator = "RaidOneWriteDAG";
1086
1087 /* 2 implies access not SU aligned */
1088 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1089 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1090
1091 /* alloc the Wnd nodes and the Wmir node */
1092 if (asmap->numDataFailed == 1)
1093 nWndNodes--;
1094 if (asmap->numParityFailed == 1)
1095 nWmirNodes--;
1096
1097 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock
1098 * + terminator) */
1099 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1100 (RF_DagNode_t *), allocList);
1101 i = 0;
1102 wndNode = &nodes[i];
1103 i += nWndNodes;
1104 wmirNode = &nodes[i];
1105 i += nWmirNodes;
1106 commitNode = &nodes[i];
1107 i += 1;
1108 unblockNode = &nodes[i];
1109 i += 1;
1110 termNode = &nodes[i];
1111 i += 1;
1112 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1113
1114 /* this dag can commit immediately */
1115 dag_h->numCommitNodes = 1;
1116 dag_h->numCommits = 0;
1117 dag_h->numSuccedents = 1;
1118
1119 /* initialize the commit, unblock, and term nodes */
1120 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1121 NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList);
1122 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1123 NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
1124 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
1125 NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1126
1127 /* initialize the wnd nodes */
1128 if (nWndNodes > 0) {
1129 pda = asmap->physInfo;
1130 for (i = 0; i < nWndNodes; i++) {
1131 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1132 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
1133 RF_ASSERT(pda != NULL);
1134 wndNode[i].params[0].p = pda;
1135 wndNode[i].params[1].p = pda->bufPtr;
1136 wndNode[i].params[2].v = parityStripeID;
1137 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1138 pda = pda->next;
1139 }
1140 RF_ASSERT(pda == NULL);
1141 }
1142 /* initialize the mirror nodes */
1143 if (nWmirNodes > 0) {
1144 pda = asmap->physInfo;
1145 pdaP = asmap->parityInfo;
1146 for (i = 0; i < nWmirNodes; i++) {
1147 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1148 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
1149 RF_ASSERT(pda != NULL);
1150 wmirNode[i].params[0].p = pdaP;
1151 wmirNode[i].params[1].p = pda->bufPtr;
1152 wmirNode[i].params[2].v = parityStripeID;
1153 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1154 pda = pda->next;
1155 pdaP = pdaP->next;
1156 }
1157 RF_ASSERT(pda == NULL);
1158 RF_ASSERT(pdaP == NULL);
1159 }
1160 /* link the header node to the commit node */
1161 RF_ASSERT(dag_h->numSuccedents == 1);
1162 RF_ASSERT(commitNode->numAntecedents == 0);
1163 dag_h->succedents[0] = commitNode;
1164
1165 /* link the commit node to the write nodes */
1166 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1167 for (i = 0; i < nWndNodes; i++) {
1168 RF_ASSERT(wndNode[i].numAntecedents == 1);
1169 commitNode->succedents[i] = &wndNode[i];
1170 wndNode[i].antecedents[0] = commitNode;
1171 wndNode[i].antType[0] = rf_control;
1172 }
1173 for (i = 0; i < nWmirNodes; i++) {
1174 RF_ASSERT(wmirNode[i].numAntecedents == 1);
1175 commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1176 wmirNode[i].antecedents[0] = commitNode;
1177 wmirNode[i].antType[0] = rf_control;
1178 }
1179
1180 /* link the write nodes to the unblock node */
1181 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1182 for (i = 0; i < nWndNodes; i++) {
1183 RF_ASSERT(wndNode[i].numSuccedents == 1);
1184 wndNode[i].succedents[0] = unblockNode;
1185 unblockNode->antecedents[i] = &wndNode[i];
1186 unblockNode->antType[i] = rf_control;
1187 }
1188 for (i = 0; i < nWmirNodes; i++) {
1189 RF_ASSERT(wmirNode[i].numSuccedents == 1);
1190 wmirNode[i].succedents[0] = unblockNode;
1191 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1192 unblockNode->antType[i + nWndNodes] = rf_control;
1193 }
1194
1195 /* link the unblock node to the term node */
1196 RF_ASSERT(unblockNode->numSuccedents == 1);
1197 RF_ASSERT(termNode->numAntecedents == 1);
1198 RF_ASSERT(termNode->numSuccedents == 0);
1199 unblockNode->succedents[0] = termNode;
1200 termNode->antecedents[0] = unblockNode;
1201 termNode->antType[0] = rf_control;
1202 }
1203
1204
1205
1206 /* DAGs which have no commit points.
1207 *
1208 * The following DAGs are used in forward and backward error recovery experiments.
1209 * They are identical to the DAGs above this comment with the exception that the
1210 * the commit points have been removed.
1211 */
1212
1213
1214
1215 void
1216 rf_CommonCreateLargeWriteDAGFwd(
1217 RF_Raid_t * raidPtr,
1218 RF_AccessStripeMap_t * asmap,
1219 RF_DagHeader_t * dag_h,
1220 void *bp,
1221 RF_RaidAccessFlags_t flags,
1222 RF_AllocListElem_t * allocList,
1223 int nfaults,
1224 int (*redFunc) (RF_DagNode_t *),
1225 int allowBufferRecycle)
1226 {
1227 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1228 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1229 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1230 RF_AccessStripeMapHeader_t *new_asm_h[2];
1231 RF_StripeNum_t parityStripeID;
1232 char *sosBuffer, *eosBuffer;
1233 RF_ReconUnitNum_t which_ru;
1234 RF_RaidLayout_t *layoutPtr;
1235 RF_PhysDiskAddr_t *pda;
1236
1237 layoutPtr = &(raidPtr->Layout);
1238 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1239
1240 if (rf_dagDebug)
1241 printf("[Creating large-write DAG]\n");
1242 dag_h->creator = "LargeWriteDAGFwd";
1243
1244 dag_h->numCommitNodes = 0;
1245 dag_h->numCommits = 0;
1246 dag_h->numSuccedents = 1;
1247
1248 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
1249 nWndNodes = asmap->numStripeUnitsAccessed;
1250 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1251 i = 0;
1252 wndNodes = &nodes[i];
1253 i += nWndNodes;
1254 xorNode = &nodes[i];
1255 i += 1;
1256 wnpNode = &nodes[i];
1257 i += 1;
1258 blockNode = &nodes[i];
1259 i += 1;
1260 syncNode = &nodes[i];
1261 i += 1;
1262 termNode = &nodes[i];
1263 i += 1;
1264 if (nfaults == 2) {
1265 wnqNode = &nodes[i];
1266 i += 1;
1267 } else {
1268 wnqNode = NULL;
1269 }
1270 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1271 if (nRodNodes > 0) {
1272 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1273 } else {
1274 rodNodes = NULL;
1275 }
1276
1277 /* begin node initialization */
1278 if (nRodNodes > 0) {
1279 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
1280 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList);
1281 } else {
1282 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
1283 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList);
1284 }
1285
1286 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
1287
1288 /* initialize the Rod nodes */
1289 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1290 if (new_asm_h[asmNum]) {
1291 pda = new_asm_h[asmNum]->stripeMap->physInfo;
1292 while (pda) {
1293 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList);
1294 rodNodes[nodeNum].params[0].p = pda;
1295 rodNodes[nodeNum].params[1].p = pda->bufPtr;
1296 rodNodes[nodeNum].params[2].v = parityStripeID;
1297 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1298 nodeNum++;
1299 pda = pda->next;
1300 }
1301 }
1302 }
1303 RF_ASSERT(nodeNum == nRodNodes);
1304
1305 /* initialize the wnd nodes */
1306 pda = asmap->physInfo;
1307 for (i = 0; i < nWndNodes; i++) {
1308 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1309 RF_ASSERT(pda != NULL);
1310 wndNodes[i].params[0].p = pda;
1311 wndNodes[i].params[1].p = pda->bufPtr;
1312 wndNodes[i].params[2].v = parityStripeID;
1313 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1314 pda = pda->next;
1315 }
1316
1317 /* initialize the redundancy node */
1318 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
1319 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1320 for (i = 0; i < nWndNodes; i++) {
1321 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */
1322 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */
1323 }
1324 for (i = 0; i < nRodNodes; i++) {
1325 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */
1326 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */
1327 }
1328 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; /* xor node needs to get
1329 * at RAID information */
1330
1331 /* look for an Rod node that reads a complete SU. If none, alloc a
1332 * buffer to receive the parity info. Note that we can't use a new
1333 * data buffer because it will not have gotten written when the xor
1334 * occurs. */
1335 if (allowBufferRecycle) {
1336 for (i = 0; i < nRodNodes; i++)
1337 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1338 break;
1339 }
1340 if ((!allowBufferRecycle) || (i == nRodNodes)) {
1341 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1342 } else
1343 xorNode->results[0] = rodNodes[i].params[1].p;
1344
1345 /* initialize the Wnp node */
1346 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
1347 wnpNode->params[0].p = asmap->parityInfo;
1348 wnpNode->params[1].p = xorNode->results[0];
1349 wnpNode->params[2].v = parityStripeID;
1350 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1351 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must
1352 * describe entire
1353 * parity unit */
1354
1355 if (nfaults == 2) {
1356 /* we never try to recycle a buffer for the Q calcuation in
1357 * addition to the parity. This would cause two buffers to get
1358 * smashed during the P and Q calculation, guaranteeing one
1359 * would be wrong. */
1360 RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1361 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
1362 wnqNode->params[0].p = asmap->qInfo;
1363 wnqNode->params[1].p = xorNode->results[1];
1364 wnqNode->params[2].v = parityStripeID;
1365 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1366 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must
1367 * describe entire
1368 * parity unit */
1369 }
1370 /* connect nodes to form graph */
1371
1372 /* connect dag header to block node */
1373 RF_ASSERT(blockNode->numAntecedents == 0);
1374 dag_h->succedents[0] = blockNode;
1375
1376 if (nRodNodes > 0) {
1377 /* connect the block node to the Rod nodes */
1378 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1379 RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1380 for (i = 0; i < nRodNodes; i++) {
1381 RF_ASSERT(rodNodes[i].numAntecedents == 1);
1382 blockNode->succedents[i] = &rodNodes[i];
1383 rodNodes[i].antecedents[0] = blockNode;
1384 rodNodes[i].antType[0] = rf_control;
1385
1386 /* connect the Rod nodes to the Nil node */
1387 RF_ASSERT(rodNodes[i].numSuccedents == 1);
1388 rodNodes[i].succedents[0] = syncNode;
1389 syncNode->antecedents[i] = &rodNodes[i];
1390 syncNode->antType[i] = rf_trueData;
1391 }
1392 } else {
1393 /* connect the block node to the Nil node */
1394 RF_ASSERT(blockNode->numSuccedents == 1);
1395 RF_ASSERT(syncNode->numAntecedents == 1);
1396 blockNode->succedents[0] = syncNode;
1397 syncNode->antecedents[0] = blockNode;
1398 syncNode->antType[0] = rf_control;
1399 }
1400
1401 /* connect the sync node to the Wnd nodes */
1402 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1403 for (i = 0; i < nWndNodes; i++) {
1404 RF_ASSERT(wndNodes->numAntecedents == 1);
1405 syncNode->succedents[i] = &wndNodes[i];
1406 wndNodes[i].antecedents[0] = syncNode;
1407 wndNodes[i].antType[0] = rf_control;
1408 }
1409
1410 /* connect the sync node to the Xor node */
1411 RF_ASSERT(xorNode->numAntecedents == 1);
1412 syncNode->succedents[nWndNodes] = xorNode;
1413 xorNode->antecedents[0] = syncNode;
1414 xorNode->antType[0] = rf_control;
1415
1416 /* connect the xor node to the write parity node */
1417 RF_ASSERT(xorNode->numSuccedents == nfaults);
1418 RF_ASSERT(wnpNode->numAntecedents == 1);
1419 xorNode->succedents[0] = wnpNode;
1420 wnpNode->antecedents[0] = xorNode;
1421 wnpNode->antType[0] = rf_trueData;
1422 if (nfaults == 2) {
1423 RF_ASSERT(wnqNode->numAntecedents == 1);
1424 xorNode->succedents[1] = wnqNode;
1425 wnqNode->antecedents[0] = xorNode;
1426 wnqNode->antType[0] = rf_trueData;
1427 }
1428 /* connect the write nodes to the term node */
1429 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1430 RF_ASSERT(termNode->numSuccedents == 0);
1431 for (i = 0; i < nWndNodes; i++) {
1432 RF_ASSERT(wndNodes->numSuccedents == 1);
1433 wndNodes[i].succedents[0] = termNode;
1434 termNode->antecedents[i] = &wndNodes[i];
1435 termNode->antType[i] = rf_control;
1436 }
1437 RF_ASSERT(wnpNode->numSuccedents == 1);
1438 wnpNode->succedents[0] = termNode;
1439 termNode->antecedents[nWndNodes] = wnpNode;
1440 termNode->antType[nWndNodes] = rf_control;
1441 if (nfaults == 2) {
1442 RF_ASSERT(wnqNode->numSuccedents == 1);
1443 wnqNode->succedents[0] = termNode;
1444 termNode->antecedents[nWndNodes + 1] = wnqNode;
1445 termNode->antType[nWndNodes + 1] = rf_control;
1446 }
1447 }
1448
1449
1450 /******************************************************************************
1451 *
1452 * creates a DAG to perform a small-write operation (either raid 5 or pq),
1453 * which is as follows:
1454 *
1455 * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
1456 * \- Rod X- Wnd [Und] -------/
1457 * [\- Rod X- Wnd [Und] ------/]
1458 * [\- Roq - Q --> Wnq [Unq]-/]
1459 *
1460 * Rop = read old parity
1461 * Rod = read old data
1462 * Roq = read old "q"
1463 * Cmt = commit node
1464 * Und = unlock data disk
1465 * Unp = unlock parity disk
1466 * Unq = unlock q disk
1467 * Wnp = write new parity
1468 * Wnd = write new data
1469 * Wnq = write new "q"
1470 * [ ] denotes optional segments in the graph
1471 *
1472 * Parameters: raidPtr - description of the physical array
1473 * asmap - logical & physical addresses for this access
1474 * bp - buffer ptr (holds write data)
1475 * flags - general flags (e.g. disk locking)
1476 * allocList - list of memory allocated in DAG creation
1477 * pfuncs - list of parity generating functions
1478 * qfuncs - list of q generating functions
1479 *
1480 * A null qfuncs indicates single fault tolerant
1481 *****************************************************************************/
1482
1483 void
1484 rf_CommonCreateSmallWriteDAGFwd(
1485 RF_Raid_t * raidPtr,
1486 RF_AccessStripeMap_t * asmap,
1487 RF_DagHeader_t * dag_h,
1488 void *bp,
1489 RF_RaidAccessFlags_t flags,
1490 RF_AllocListElem_t * allocList,
1491 RF_RedFuncs_t * pfuncs,
1492 RF_RedFuncs_t * qfuncs)
1493 {
1494 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1495 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1496 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1497 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1498 int i, j, nNodes, totalNumNodes, lu_flag;
1499 RF_ReconUnitNum_t which_ru;
1500 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
1501 int (*qfunc) (RF_DagNode_t *);
1502 int numDataNodes, numParityNodes;
1503 RF_StripeNum_t parityStripeID;
1504 RF_PhysDiskAddr_t *pda;
1505 char *name, *qname;
1506 long nfaults;
1507
1508 nfaults = qfuncs ? 2 : 1;
1509 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
1510
1511 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1512 pda = asmap->physInfo;
1513 numDataNodes = asmap->numStripeUnitsAccessed;
1514 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1515
1516 if (rf_dagDebug)
1517 printf("[Creating small-write DAG]\n");
1518 RF_ASSERT(numDataNodes > 0);
1519 dag_h->creator = "SmallWriteDAGFwd";
1520
1521 dag_h->numCommitNodes = 0;
1522 dag_h->numCommits = 0;
1523 dag_h->numSuccedents = 1;
1524
1525 qfunc = NULL;
1526 qname = NULL;
1527
1528 /* DAG creation occurs in four steps: 1. count the number of nodes in
1529 * the DAG 2. create the nodes 3. initialize the nodes 4. connect the
1530 * nodes */
1531
1532 /* Step 1. compute number of nodes in the graph */
1533
1534 /* number of nodes: a read and write for each data unit a redundancy
1535 * computation node for each parity node (nfaults * nparity) a read
1536 * and write for each parity unit a block node a terminate node if
1537 * atomic RMW an unlock node for each data unit, redundancy unit */
1538 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2;
1539 if (lu_flag)
1540 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1541
1542
1543 /* Step 2. create the nodes */
1544 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1545 i = 0;
1546 blockNode = &nodes[i];
1547 i += 1;
1548 readDataNodes = &nodes[i];
1549 i += numDataNodes;
1550 readParityNodes = &nodes[i];
1551 i += numParityNodes;
1552 writeDataNodes = &nodes[i];
1553 i += numDataNodes;
1554 writeParityNodes = &nodes[i];
1555 i += numParityNodes;
1556 xorNodes = &nodes[i];
1557 i += numParityNodes;
1558 termNode = &nodes[i];
1559 i += 1;
1560 if (lu_flag) {
1561 unlockDataNodes = &nodes[i];
1562 i += numDataNodes;
1563 unlockParityNodes = &nodes[i];
1564 i += numParityNodes;
1565 } else {
1566 unlockDataNodes = unlockParityNodes = NULL;
1567 }
1568 if (nfaults == 2) {
1569 readQNodes = &nodes[i];
1570 i += numParityNodes;
1571 writeQNodes = &nodes[i];
1572 i += numParityNodes;
1573 qNodes = &nodes[i];
1574 i += numParityNodes;
1575 if (lu_flag) {
1576 unlockQNodes = &nodes[i];
1577 i += numParityNodes;
1578 } else {
1579 unlockQNodes = NULL;
1580 }
1581 } else {
1582 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1583 }
1584 RF_ASSERT(i == totalNumNodes);
1585
1586 /* Step 3. initialize the nodes */
1587 /* initialize block node (Nil) */
1588 nNodes = numDataNodes + (nfaults * numParityNodes);
1589 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
1590
1591 /* initialize terminate node (Trm) */
1592 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
1593
1594 /* initialize nodes which read old data (Rod) */
1595 for (i = 0; i < numDataNodes; i++) {
1596 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList);
1597 RF_ASSERT(pda != NULL);
1598 readDataNodes[i].params[0].p = pda; /* physical disk addr
1599 * desc */
1600 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old
1601 * data */
1602 readDataNodes[i].params[2].v = parityStripeID;
1603 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1604 pda = pda->next;
1605 for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1606 readDataNodes[i].propList[j] = NULL;
1607 }
1608
1609 /* initialize nodes which read old parity (Rop) */
1610 pda = asmap->parityInfo;
1611 i = 0;
1612 for (i = 0; i < numParityNodes; i++) {
1613 RF_ASSERT(pda != NULL);
1614 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1615 readParityNodes[i].params[0].p = pda;
1616 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old
1617 * parity */
1618 readParityNodes[i].params[2].v = parityStripeID;
1619 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1620 for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1621 readParityNodes[i].propList[0] = NULL;
1622 pda = pda->next;
1623 }
1624
1625 /* initialize nodes which read old Q (Roq) */
1626 if (nfaults == 2) {
1627 pda = asmap->qInfo;
1628 for (i = 0; i < numParityNodes; i++) {
1629 RF_ASSERT(pda != NULL);
1630 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
1631 readQNodes[i].params[0].p = pda;
1632 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */
1633 readQNodes[i].params[2].v = parityStripeID;
1634 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1635 for (j = 0; j < readQNodes[i].numSuccedents; j++)
1636 readQNodes[i].propList[0] = NULL;
1637 pda = pda->next;
1638 }
1639 }
1640 /* initialize nodes which write new data (Wnd) */
1641 pda = asmap->physInfo;
1642 for (i = 0; i < numDataNodes; i++) {
1643 RF_ASSERT(pda != NULL);
1644 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1645 writeDataNodes[i].params[0].p = pda; /* physical disk addr
1646 * desc */
1647 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new
1648 * data to be written */
1649 writeDataNodes[i].params[2].v = parityStripeID;
1650 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1651
1652 if (lu_flag) {
1653 /* initialize node to unlock the disk queue */
1654 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
1655 unlockDataNodes[i].params[0].p = pda; /* physical disk addr
1656 * desc */
1657 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1658 }
1659 pda = pda->next;
1660 }
1661
1662
1663 /* initialize nodes which compute new parity and Q */
1664 /* we use the simple XOR func in the double-XOR case, and when we're
1665 * accessing only a portion of one stripe unit. the distinction
1666 * between the two is that the regular XOR func assumes that the
1667 * targbuf is a full SU in size, and examines the pda associated with
1668 * the buffer to decide where within the buffer to XOR the data,
1669 * whereas the simple XOR func just XORs the data into the start of
1670 * the buffer. */
1671 if ((numParityNodes == 2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
1672 func = pfuncs->simple;
1673 undoFunc = rf_NullNodeUndoFunc;
1674 name = pfuncs->SimpleName;
1675 if (qfuncs) {
1676 qfunc = qfuncs->simple;
1677 qname = qfuncs->SimpleName;
1678 }
1679 } else {
1680 func = pfuncs->regular;
1681 undoFunc = rf_NullNodeUndoFunc;
1682 name = pfuncs->RegularName;
1683 if (qfuncs) {
1684 qfunc = qfuncs->regular;
1685 qname = qfuncs->RegularName;
1686 }
1687 }
1688 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop}
1689 * nodes, and raidPtr */
1690 if (numParityNodes == 2) { /* double-xor case */
1691 for (i = 0; i < numParityNodes; i++) {
1692 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for
1693 * xor */
1694 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1695 xorNodes[i].params[0] = readDataNodes[i].params[0];
1696 xorNodes[i].params[1] = readDataNodes[i].params[1];
1697 xorNodes[i].params[2] = readParityNodes[i].params[0];
1698 xorNodes[i].params[3] = readParityNodes[i].params[1];
1699 xorNodes[i].params[4] = writeDataNodes[i].params[0];
1700 xorNodes[i].params[5] = writeDataNodes[i].params[1];
1701 xorNodes[i].params[6].p = raidPtr;
1702 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as
1703 * target buf */
1704 if (nfaults == 2) {
1705 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList); /* no wakeup func for
1706 * xor */
1707 qNodes[i].params[0] = readDataNodes[i].params[0];
1708 qNodes[i].params[1] = readDataNodes[i].params[1];
1709 qNodes[i].params[2] = readQNodes[i].params[0];
1710 qNodes[i].params[3] = readQNodes[i].params[1];
1711 qNodes[i].params[4] = writeDataNodes[i].params[0];
1712 qNodes[i].params[5] = writeDataNodes[i].params[1];
1713 qNodes[i].params[6].p = raidPtr;
1714 qNodes[i].results[0] = readQNodes[i].params[1].p; /* use old Q buf as
1715 * target buf */
1716 }
1717 }
1718 } else {
1719 /* there is only one xor node in this case */
1720 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
1721 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1722 for (i = 0; i < numDataNodes + 1; i++) {
1723 /* set up params related to Rod and Rop nodes */
1724 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
1725 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */
1726 }
1727 for (i = 0; i < numDataNodes; i++) {
1728 /* set up params related to Wnd and Wnp nodes */
1729 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */
1730 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */
1731 }
1732 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get
1733 * at RAID information */
1734 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1735 if (nfaults == 2) {
1736 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList);
1737 for (i = 0; i < numDataNodes; i++) {
1738 /* set up params related to Rod */
1739 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
1740 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */
1741 }
1742 /* and read old q */
1743 qNodes[0].params[2 * numDataNodes + 0] = readQNodes[0].params[0]; /* pda */
1744 qNodes[0].params[2 * numDataNodes + 1] = readQNodes[0].params[1]; /* buffer pointer */
1745 for (i = 0; i < numDataNodes; i++) {
1746 /* set up params related to Wnd nodes */
1747 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */
1748 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */
1749 }
1750 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get
1751 * at RAID information */
1752 qNodes[0].results[0] = readQNodes[0].params[1].p;
1753 }
1754 }
1755
1756 /* initialize nodes which write new parity (Wnp) */
1757 pda = asmap->parityInfo;
1758 for (i = 0; i < numParityNodes; i++) {
1759 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList);
1760 RF_ASSERT(pda != NULL);
1761 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
1762 * filled in by xor node */
1763 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
1764 * parity write
1765 * operation */
1766 writeParityNodes[i].params[2].v = parityStripeID;
1767 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1768
1769 if (lu_flag) {
1770 /* initialize node to unlock the disk queue */
1771 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList);
1772 unlockParityNodes[i].params[0].p = pda; /* physical disk addr
1773 * desc */
1774 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1775 }
1776 pda = pda->next;
1777 }
1778
1779 /* initialize nodes which write new Q (Wnq) */
1780 if (nfaults == 2) {
1781 pda = asmap->qInfo;
1782 for (i = 0; i < numParityNodes; i++) {
1783 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList);
1784 RF_ASSERT(pda != NULL);
1785 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
1786 * filled in by xor node */
1787 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
1788 * parity write
1789 * operation */
1790 writeQNodes[i].params[2].v = parityStripeID;
1791 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1792
1793 if (lu_flag) {
1794 /* initialize node to unlock the disk queue */
1795 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList);
1796 unlockQNodes[i].params[0].p = pda; /* physical disk addr
1797 * desc */
1798 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1799 }
1800 pda = pda->next;
1801 }
1802 }
1803 /* Step 4. connect the nodes */
1804
1805 /* connect header to block node */
1806 dag_h->succedents[0] = blockNode;
1807
1808 /* connect block node to read old data nodes */
1809 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
1810 for (i = 0; i < numDataNodes; i++) {
1811 blockNode->succedents[i] = &readDataNodes[i];
1812 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
1813 readDataNodes[i].antecedents[0] = blockNode;
1814 readDataNodes[i].antType[0] = rf_control;
1815 }
1816
1817 /* connect block node to read old parity nodes */
1818 for (i = 0; i < numParityNodes; i++) {
1819 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
1820 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
1821 readParityNodes[i].antecedents[0] = blockNode;
1822 readParityNodes[i].antType[0] = rf_control;
1823 }
1824
1825 /* connect block node to read old Q nodes */
1826 if (nfaults == 2)
1827 for (i = 0; i < numParityNodes; i++) {
1828 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
1829 RF_ASSERT(readQNodes[i].numAntecedents == 1);
1830 readQNodes[i].antecedents[0] = blockNode;
1831 readQNodes[i].antType[0] = rf_control;
1832 }
1833
1834 /* connect read old data nodes to write new data nodes */
1835 for (i = 0; i < numDataNodes; i++) {
1836 RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1));
1837 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1838 readDataNodes[i].succedents[0] = &writeDataNodes[i];
1839 writeDataNodes[i].antecedents[0] = &readDataNodes[i];
1840 writeDataNodes[i].antType[0] = rf_antiData;
1841 }
1842
1843 /* connect read old data nodes to xor nodes */
1844 for (i = 0; i < numDataNodes; i++) {
1845 for (j = 0; j < numParityNodes; j++) {
1846 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
1847 readDataNodes[i].succedents[1 + j] = &xorNodes[j];
1848 xorNodes[j].antecedents[i] = &readDataNodes[i];
1849 xorNodes[j].antType[i] = rf_trueData;
1850 }
1851 }
1852
1853 /* connect read old data nodes to q nodes */
1854 if (nfaults == 2)
1855 for (i = 0; i < numDataNodes; i++)
1856 for (j = 0; j < numParityNodes; j++) {
1857 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
1858 readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j];
1859 qNodes[j].antecedents[i] = &readDataNodes[i];
1860 qNodes[j].antType[i] = rf_trueData;
1861 }
1862
1863 /* connect read old parity nodes to xor nodes */
1864 for (i = 0; i < numParityNodes; i++) {
1865 for (j = 0; j < numParityNodes; j++) {
1866 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1867 readParityNodes[i].succedents[j] = &xorNodes[j];
1868 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
1869 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
1870 }
1871 }
1872
1873 /* connect read old q nodes to q nodes */
1874 if (nfaults == 2)
1875 for (i = 0; i < numParityNodes; i++) {
1876 for (j = 0; j < numParityNodes; j++) {
1877 RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes);
1878 readQNodes[i].succedents[j] = &qNodes[j];
1879 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
1880 qNodes[j].antType[numDataNodes + i] = rf_trueData;
1881 }
1882 }
1883
1884 /* connect xor nodes to the write new parity nodes */
1885 for (i = 0; i < numParityNodes; i++) {
1886 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
1887 for (j = 0; j < numParityNodes; j++) {
1888 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
1889 xorNodes[i].succedents[j] = &writeParityNodes[j];
1890 writeParityNodes[j].antecedents[i] = &xorNodes[i];
1891 writeParityNodes[j].antType[i] = rf_trueData;
1892 }
1893 }
1894
1895 /* connect q nodes to the write new q nodes */
1896 if (nfaults == 2)
1897 for (i = 0; i < numParityNodes; i++) {
1898 RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes);
1899 for (j = 0; j < numParityNodes; j++) {
1900 RF_ASSERT(qNodes[j].numSuccedents == 1);
1901 qNodes[i].succedents[j] = &writeQNodes[j];
1902 writeQNodes[j].antecedents[i] = &qNodes[i];
1903 writeQNodes[j].antType[i] = rf_trueData;
1904 }
1905 }
1906
1907 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1908 RF_ASSERT(termNode->numSuccedents == 0);
1909 for (i = 0; i < numDataNodes; i++) {
1910 if (lu_flag) {
1911 /* connect write new data nodes to unlock nodes */
1912 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1913 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1914 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1915 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1916 unlockDataNodes[i].antType[0] = rf_control;
1917
1918 /* connect unlock nodes to term node */
1919 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1920 unlockDataNodes[i].succedents[0] = termNode;
1921 termNode->antecedents[i] = &unlockDataNodes[i];
1922 termNode->antType[i] = rf_control;
1923 } else {
1924 /* connect write new data nodes to term node */
1925 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1926 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1927 writeDataNodes[i].succedents[0] = termNode;
1928 termNode->antecedents[i] = &writeDataNodes[i];
1929 termNode->antType[i] = rf_control;
1930 }
1931 }
1932
1933 for (i = 0; i < numParityNodes; i++) {
1934 if (lu_flag) {
1935 /* connect write new parity nodes to unlock nodes */
1936 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1937 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1938 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1939 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1940 unlockParityNodes[i].antType[0] = rf_control;
1941
1942 /* connect unlock nodes to term node */
1943 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1944 unlockParityNodes[i].succedents[0] = termNode;
1945 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1946 termNode->antType[numDataNodes + i] = rf_control;
1947 } else {
1948 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1949 writeParityNodes[i].succedents[0] = termNode;
1950 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1951 termNode->antType[numDataNodes + i] = rf_control;
1952 }
1953 }
1954
1955 if (nfaults == 2)
1956 for (i = 0; i < numParityNodes; i++) {
1957 if (lu_flag) {
1958 /* connect write new Q nodes to unlock nodes */
1959 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1960 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1961 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1962 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1963 unlockQNodes[i].antType[0] = rf_control;
1964
1965 /* connect unlock nodes to unblock node */
1966 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1967 unlockQNodes[i].succedents[0] = termNode;
1968 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1969 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1970 } else {
1971 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1972 writeQNodes[i].succedents[0] = termNode;
1973 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1974 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1975 }
1976 }
1977 }
1978
1979
1980
1981 /******************************************************************************
1982 * create a write graph (fault-free or degraded) for RAID level 1
1983 *
1984 * Hdr Nil -> Wpd -> Nil -> Trm
1985 * Nil -> Wsd ->
1986 *
1987 * The "Wpd" node writes data to the primary copy in the mirror pair
1988 * The "Wsd" node writes data to the secondary copy in the mirror pair
1989 *
1990 * Parameters: raidPtr - description of the physical array
1991 * asmap - logical & physical addresses for this access
1992 * bp - buffer ptr (holds write data)
1993 * flags - general flags (e.g. disk locking)
1994 * allocList - list of memory allocated in DAG creation
1995 *****************************************************************************/
1996
1997 void
1998 rf_CreateRaidOneWriteDAGFwd(
1999 RF_Raid_t * raidPtr,
2000 RF_AccessStripeMap_t * asmap,
2001 RF_DagHeader_t * dag_h,
2002 void *bp,
2003 RF_RaidAccessFlags_t flags,
2004 RF_AllocListElem_t * allocList)
2005 {
2006 RF_DagNode_t *blockNode, *unblockNode, *termNode;
2007 RF_DagNode_t *nodes, *wndNode, *wmirNode;
2008 int nWndNodes, nWmirNodes, i;
2009 RF_ReconUnitNum_t which_ru;
2010 RF_PhysDiskAddr_t *pda, *pdaP;
2011 RF_StripeNum_t parityStripeID;
2012
2013 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2014 asmap->raidAddress, &which_ru);
2015 if (rf_dagDebug) {
2016 printf("[Creating RAID level 1 write DAG]\n");
2017 }
2018 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; /* 2 implies access not
2019 * SU aligned */
2020 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2021
2022 /* alloc the Wnd nodes and the Wmir node */
2023 if (asmap->numDataFailed == 1)
2024 nWndNodes--;
2025 if (asmap->numParityFailed == 1)
2026 nWmirNodes--;
2027
2028 /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock +
2029 * terminator) */
2030 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2031 i = 0;
2032 wndNode = &nodes[i];
2033 i += nWndNodes;
2034 wmirNode = &nodes[i];
2035 i += nWmirNodes;
2036 blockNode = &nodes[i];
2037 i += 1;
2038 unblockNode = &nodes[i];
2039 i += 1;
2040 termNode = &nodes[i];
2041 i += 1;
2042 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2043
2044 /* this dag can commit immediately */
2045 dag_h->numCommitNodes = 0;
2046 dag_h->numCommits = 0;
2047 dag_h->numSuccedents = 1;
2048
2049 /* initialize the unblock and term nodes */
2050 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList);
2051 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
2052 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2053
2054 /* initialize the wnd nodes */
2055 if (nWndNodes > 0) {
2056 pda = asmap->physInfo;
2057 for (i = 0; i < nWndNodes; i++) {
2058 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
2059 RF_ASSERT(pda != NULL);
2060 wndNode[i].params[0].p = pda;
2061 wndNode[i].params[1].p = pda->bufPtr;
2062 wndNode[i].params[2].v = parityStripeID;
2063 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2064 pda = pda->next;
2065 }
2066 RF_ASSERT(pda == NULL);
2067 }
2068 /* initialize the mirror nodes */
2069 if (nWmirNodes > 0) {
2070 pda = asmap->physInfo;
2071 pdaP = asmap->parityInfo;
2072 for (i = 0; i < nWmirNodes; i++) {
2073 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
2074 RF_ASSERT(pda != NULL);
2075 wmirNode[i].params[0].p = pdaP;
2076 wmirNode[i].params[1].p = pda->bufPtr;
2077 wmirNode[i].params[2].v = parityStripeID;
2078 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2079 pda = pda->next;
2080 pdaP = pdaP->next;
2081 }
2082 RF_ASSERT(pda == NULL);
2083 RF_ASSERT(pdaP == NULL);
2084 }
2085 /* link the header node to the block node */
2086 RF_ASSERT(dag_h->numSuccedents == 1);
2087 RF_ASSERT(blockNode->numAntecedents == 0);
2088 dag_h->succedents[0] = blockNode;
2089
2090 /* link the block node to the write nodes */
2091 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2092 for (i = 0; i < nWndNodes; i++) {
2093 RF_ASSERT(wndNode[i].numAntecedents == 1);
2094 blockNode->succedents[i] = &wndNode[i];
2095 wndNode[i].antecedents[0] = blockNode;
2096 wndNode[i].antType[0] = rf_control;
2097 }
2098 for (i = 0; i < nWmirNodes; i++) {
2099 RF_ASSERT(wmirNode[i].numAntecedents == 1);
2100 blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2101 wmirNode[i].antecedents[0] = blockNode;
2102 wmirNode[i].antType[0] = rf_control;
2103 }
2104
2105 /* link the write nodes to the unblock node */
2106 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2107 for (i = 0; i < nWndNodes; i++) {
2108 RF_ASSERT(wndNode[i].numSuccedents == 1);
2109 wndNode[i].succedents[0] = unblockNode;
2110 unblockNode->antecedents[i] = &wndNode[i];
2111 unblockNode->antType[i] = rf_control;
2112 }
2113 for (i = 0; i < nWmirNodes; i++) {
2114 RF_ASSERT(wmirNode[i].numSuccedents == 1);
2115 wmirNode[i].succedents[0] = unblockNode;
2116 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2117 unblockNode->antType[i + nWndNodes] = rf_control;
2118 }
2119
2120 /* link the unblock node to the term node */
2121 RF_ASSERT(unblockNode->numSuccedents == 1);
2122 RF_ASSERT(termNode->numAntecedents == 1);
2123 RF_ASSERT(termNode->numSuccedents == 0);
2124 unblockNode->succedents[0] = termNode;
2125 termNode->antecedents[0] = unblockNode;
2126 termNode->antType[0] = rf_control;
2127
2128 return;
2129 }
2130