rf_dagffwr.c revision 1.25 1 /* $NetBSD: rf_dagffwr.c,v 1.25 2004/03/23 21:53:36 oster Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /*
30 * rf_dagff.c
31 *
32 * code for creating fault-free DAGs
33 *
34 */
35
36 #include <sys/cdefs.h>
37 __KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.25 2004/03/23 21:53:36 oster Exp $");
38
39 #include <dev/raidframe/raidframevar.h>
40
41 #include "rf_raid.h"
42 #include "rf_dag.h"
43 #include "rf_dagutils.h"
44 #include "rf_dagfuncs.h"
45 #include "rf_debugMem.h"
46 #include "rf_dagffrd.h"
47 #include "rf_general.h"
48 #include "rf_dagffwr.h"
49 #include "rf_map.h"
50
51 /******************************************************************************
52 *
53 * General comments on DAG creation:
54 *
55 * All DAGs in this file use roll-away error recovery. Each DAG has a single
56 * commit node, usually called "Cmt." If an error occurs before the Cmt node
57 * is reached, the execution engine will halt forward execution and work
58 * backward through the graph, executing the undo functions. Assuming that
59 * each node in the graph prior to the Cmt node are undoable and atomic - or -
60 * does not make changes to permanent state, the graph will fail atomically.
61 * If an error occurs after the Cmt node executes, the engine will roll-forward
62 * through the graph, blindly executing nodes until it reaches the end.
63 * If a graph reaches the end, it is assumed to have completed successfully.
64 *
65 * A graph has only 1 Cmt node.
66 *
67 */
68
69
70 /******************************************************************************
71 *
72 * The following wrappers map the standard DAG creation interface to the
73 * DAG creation routines. Additionally, these wrappers enable experimentation
74 * with new DAG structures by providing an extra level of indirection, allowing
75 * the DAG creation routines to be replaced at this single point.
76 */
77
78
79 void
80 rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
81 RF_DagHeader_t *dag_h, void *bp,
82 RF_RaidAccessFlags_t flags,
83 RF_AllocListElem_t *allocList,
84 RF_IoType_t type)
85 {
86 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
87 RF_IO_TYPE_WRITE);
88 }
89
90 void
91 rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
92 RF_DagHeader_t *dag_h, void *bp,
93 RF_RaidAccessFlags_t flags,
94 RF_AllocListElem_t *allocList,
95 RF_IoType_t type)
96 {
97 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
98 RF_IO_TYPE_WRITE);
99 }
100
101 void
102 rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
103 RF_DagHeader_t *dag_h, void *bp,
104 RF_RaidAccessFlags_t flags,
105 RF_AllocListElem_t *allocList)
106 {
107 /* "normal" rollaway */
108 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags,
109 allocList, &rf_xorFuncs, NULL);
110 }
111
112 void
113 rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
114 RF_DagHeader_t *dag_h, void *bp,
115 RF_RaidAccessFlags_t flags,
116 RF_AllocListElem_t *allocList)
117 {
118 /* "normal" rollaway */
119 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags,
120 allocList, 1, rf_RegularXorFunc, RF_TRUE);
121 }
122
123
124 /******************************************************************************
125 *
126 * DAG creation code begins here
127 */
128
129
130 /******************************************************************************
131 *
132 * creates a DAG to perform a large-write operation:
133 *
134 * / Rod \ / Wnd \
135 * H -- block- Rod - Xor - Cmt - Wnd --- T
136 * \ Rod / \ Wnp /
137 * \[Wnq]/
138 *
139 * The XOR node also does the Q calculation in the P+Q architecture.
140 * All nodes are before the commit node (Cmt) are assumed to be atomic and
141 * undoable - or - they make no changes to permanent state.
142 *
143 * Rod = read old data
144 * Cmt = commit node
145 * Wnp = write new parity
146 * Wnd = write new data
147 * Wnq = write new "q"
148 * [] denotes optional segments in the graph
149 *
150 * Parameters: raidPtr - description of the physical array
151 * asmap - logical & physical addresses for this access
152 * bp - buffer ptr (holds write data)
153 * flags - general flags (e.g. disk locking)
154 * allocList - list of memory allocated in DAG creation
155 * nfaults - number of faults array can tolerate
156 * (equal to # redundancy units in stripe)
157 * redfuncs - list of redundancy generating functions
158 *
159 *****************************************************************************/
160
161 void
162 rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
163 RF_DagHeader_t *dag_h, void *bp,
164 RF_RaidAccessFlags_t flags,
165 RF_AllocListElem_t *allocList,
166 int nfaults, int (*redFunc) (RF_DagNode_t *),
167 int allowBufferRecycle)
168 {
169 RF_DagNode_t *wndNodes, *rodNodes, *xorNode, *wnpNode, *tmpNode;
170 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
171 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
172 RF_AccessStripeMapHeader_t *new_asm_h[2];
173 RF_StripeNum_t parityStripeID;
174 char *sosBuffer, *eosBuffer;
175 RF_ReconUnitNum_t which_ru;
176 RF_RaidLayout_t *layoutPtr;
177 RF_PhysDiskAddr_t *pda;
178
179 layoutPtr = &(raidPtr->Layout);
180 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr,
181 asmap->raidAddress,
182 &which_ru);
183
184 #if RF_DEBUG_DAG
185 if (rf_dagDebug) {
186 printf("[Creating large-write DAG]\n");
187 }
188 #endif
189 dag_h->creator = "LargeWriteDAG";
190
191 dag_h->numCommitNodes = 1;
192 dag_h->numCommits = 0;
193 dag_h->numSuccedents = 1;
194
195 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
196 nWndNodes = asmap->numStripeUnitsAccessed;
197
198 for (i = 0; i < nWndNodes; i++) {
199 tmpNode = rf_AllocDAGNode();
200 tmpNode->list_next = dag_h->nodes;
201 dag_h->nodes = tmpNode;
202 }
203 wndNodes = dag_h->nodes;
204
205 xorNode = rf_AllocDAGNode();
206 xorNode->list_next = dag_h->nodes;
207 dag_h->nodes = xorNode;
208
209 wnpNode = rf_AllocDAGNode();
210 wnpNode->list_next = dag_h->nodes;
211 dag_h->nodes = wnpNode;
212
213 blockNode = rf_AllocDAGNode();
214 blockNode->list_next = dag_h->nodes;
215 dag_h->nodes = blockNode;
216
217 commitNode = rf_AllocDAGNode();
218 commitNode->list_next = dag_h->nodes;
219 dag_h->nodes = commitNode;
220
221 termNode = rf_AllocDAGNode();
222 termNode->list_next = dag_h->nodes;
223 dag_h->nodes = termNode;
224
225 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
226 if (nfaults == 2) {
227 wnqNode = rf_AllocDAGNode();
228 } else {
229 #endif
230 wnqNode = NULL;
231 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
232 }
233 #endif
234 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
235 new_asm_h, &nRodNodes, &sosBuffer,
236 &eosBuffer, allocList);
237 if (nRodNodes > 0) {
238 for (i = 0; i < nRodNodes; i++) {
239 tmpNode = rf_AllocDAGNode();
240 tmpNode->list_next = dag_h->nodes;
241 dag_h->nodes = tmpNode;
242 }
243 rodNodes = dag_h->nodes;
244 } else {
245 rodNodes = NULL;
246 }
247
248 /* begin node initialization */
249 if (nRodNodes > 0) {
250 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
251 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0,
252 dag_h, "Nil", allocList);
253 } else {
254 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
255 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0,
256 dag_h, "Nil", allocList);
257 }
258
259 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
260 rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0,
261 dag_h, "Cmt", allocList);
262 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
263 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
264 dag_h, "Trm", allocList);
265
266 /* initialize the Rod nodes */
267 tmpNode = rodNodes;
268 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
269 if (new_asm_h[asmNum]) {
270 pda = new_asm_h[asmNum]->stripeMap->physInfo;
271 while (pda) {
272 rf_InitNode(tmpNode, rf_wait,
273 RF_FALSE, rf_DiskReadFunc,
274 rf_DiskReadUndoFunc,
275 rf_GenericWakeupFunc,
276 1, 1, 4, 0, dag_h,
277 "Rod", allocList);
278 tmpNode->params[0].p = pda;
279 tmpNode->params[1].p = pda->bufPtr;
280 tmpNode->params[2].v = parityStripeID;
281 tmpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
282 which_ru);
283 nodeNum++;
284 pda = pda->next;
285 tmpNode = tmpNode->list_next;
286 }
287 }
288 }
289 RF_ASSERT(nodeNum == nRodNodes);
290
291 /* initialize the wnd nodes */
292 pda = asmap->physInfo;
293 tmpNode = wndNodes;
294 for (i = 0; i < nWndNodes; i++) {
295 rf_InitNode(tmpNode, rf_wait, RF_FALSE,
296 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
297 rf_GenericWakeupFunc, 1, 1, 4, 0,
298 dag_h, "Wnd", allocList);
299 RF_ASSERT(pda != NULL);
300 tmpNode->params[0].p = pda;
301 tmpNode->params[1].p = pda->bufPtr;
302 tmpNode->params[2].v = parityStripeID;
303 tmpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
304 pda = pda->next;
305 tmpNode = tmpNode->list_next;
306 }
307
308 /* initialize the redundancy node */
309 if (nRodNodes > 0) {
310 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
311 rf_NullNodeUndoFunc, NULL, 1,
312 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1,
313 nfaults, dag_h, "Xr ", allocList);
314 } else {
315 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
316 rf_NullNodeUndoFunc, NULL, 1,
317 1, 2 * (nWndNodes + nRodNodes) + 1,
318 nfaults, dag_h, "Xr ", allocList);
319 }
320 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
321 tmpNode = wndNodes;
322 for (i = 0; i < nWndNodes; i++) {
323 /* pda */
324 xorNode->params[2 * i + 0] = tmpNode->params[0];
325 /* buf ptr */
326 xorNode->params[2 * i + 1] = tmpNode->params[1];
327 tmpNode = tmpNode->list_next;
328 }
329 tmpNode = rodNodes;
330 for (i = 0; i < nRodNodes; i++) {
331 /* pda */
332 xorNode->params[2 * (nWndNodes + i) + 0] = tmpNode->params[0];
333 /* buf ptr */
334 xorNode->params[2 * (nWndNodes + i) + 1] = tmpNode->params[1];
335 tmpNode = tmpNode->list_next;
336 }
337 /* xor node needs to get at RAID information */
338 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
339
340 /*
341 * Look for an Rod node that reads a complete SU. If none,
342 * alloc a buffer to receive the parity info. Note that we
343 * can't use a new data buffer because it will not have gotten
344 * written when the xor occurs. */
345 if (allowBufferRecycle) {
346 tmpNode = rodNodes;
347 for (i = 0; i < nRodNodes; i++) {
348 if (((RF_PhysDiskAddr_t *) tmpNode->params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
349 break;
350 tmpNode = tmpNode->list_next;
351 }
352 }
353 if ((!allowBufferRecycle) || (i == nRodNodes)) {
354 xorNode->results[0] = rf_AllocBuffer(raidPtr, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), allocList);
355 } else {
356 /* this works because the only way we get here is if
357 allowBufferRecycle is true and we went through the
358 above for loop, and exited via the break before
359 i==nRodNodes was true. That means tmpNode will
360 still point to a valid node -- the one we want for
361 here! */
362 xorNode->results[0] = tmpNode->params[1].p;
363 }
364
365 /* initialize the Wnp node */
366 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
367 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
368 dag_h, "Wnp", allocList);
369 wnpNode->params[0].p = asmap->parityInfo;
370 wnpNode->params[1].p = xorNode->results[0];
371 wnpNode->params[2].v = parityStripeID;
372 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
373 /* parityInfo must describe entire parity unit */
374 RF_ASSERT(asmap->parityInfo->next == NULL);
375
376 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
377 if (nfaults == 2) {
378 /*
379 * We never try to recycle a buffer for the Q calcuation
380 * in addition to the parity. This would cause two buffers
381 * to get smashed during the P and Q calculation, guaranteeing
382 * one would be wrong.
383 */
384 RF_MallocAndAdd(xorNode->results[1],
385 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
386 (void *), allocList);
387 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
388 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc,
389 1, 1, 4, 0, dag_h, "Wnq", allocList);
390 wnqNode->params[0].p = asmap->qInfo;
391 wnqNode->params[1].p = xorNode->results[1];
392 wnqNode->params[2].v = parityStripeID;
393 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
394 /* parityInfo must describe entire parity unit */
395 RF_ASSERT(asmap->parityInfo->next == NULL);
396 }
397 #endif
398 /*
399 * Connect nodes to form graph.
400 */
401
402 /* connect dag header to block node */
403 RF_ASSERT(blockNode->numAntecedents == 0);
404 dag_h->succedents[0] = blockNode;
405
406 if (nRodNodes > 0) {
407 /* connect the block node to the Rod nodes */
408 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
409 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
410 tmpNode = rodNodes;
411 for (i = 0; i < nRodNodes; i++) {
412 RF_ASSERT(tmpNode.numAntecedents == 1);
413 blockNode->succedents[i] = tmpNode;
414 tmpNode->antecedents[0] = blockNode;
415 tmpNode->antType[0] = rf_control;
416
417 /* connect the Rod nodes to the Xor node */
418 RF_ASSERT(tmpNode.numSuccedents == 1);
419 tmpNode->succedents[0] = xorNode;
420 xorNode->antecedents[i] = tmpNode;
421 xorNode->antType[i] = rf_trueData;
422 tmpNode = tmpNode->list_next;
423 }
424 } else {
425 /* connect the block node to the Xor node */
426 RF_ASSERT(blockNode->numSuccedents == 1);
427 RF_ASSERT(xorNode->numAntecedents == 1);
428 blockNode->succedents[0] = xorNode;
429 xorNode->antecedents[0] = blockNode;
430 xorNode->antType[0] = rf_control;
431 }
432
433 /* connect the xor node to the commit node */
434 RF_ASSERT(xorNode->numSuccedents == 1);
435 RF_ASSERT(commitNode->numAntecedents == 1);
436 xorNode->succedents[0] = commitNode;
437 commitNode->antecedents[0] = xorNode;
438 commitNode->antType[0] = rf_control;
439
440 /* connect the commit node to the write nodes */
441 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
442 tmpNode = wndNodes;
443 for (i = 0; i < nWndNodes; i++) {
444 RF_ASSERT(wndNodes->numAntecedents == 1);
445 commitNode->succedents[i] = tmpNode;
446 tmpNode->antecedents[0] = commitNode;
447 tmpNode->antType[0] = rf_control;
448 tmpNode = tmpNode->list_next;
449 }
450 RF_ASSERT(wnpNode->numAntecedents == 1);
451 commitNode->succedents[nWndNodes] = wnpNode;
452 wnpNode->antecedents[0] = commitNode;
453 wnpNode->antType[0] = rf_trueData;
454 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
455 if (nfaults == 2) {
456 RF_ASSERT(wnqNode->numAntecedents == 1);
457 commitNode->succedents[nWndNodes + 1] = wnqNode;
458 wnqNode->antecedents[0] = commitNode;
459 wnqNode->antType[0] = rf_trueData;
460 }
461 #endif
462 /* connect the write nodes to the term node */
463 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
464 RF_ASSERT(termNode->numSuccedents == 0);
465 tmpNode = wndNodes;
466 for (i = 0; i < nWndNodes; i++) {
467 RF_ASSERT(wndNodes->numSuccedents == 1);
468 tmpNode->succedents[0] = termNode;
469 termNode->antecedents[i] = tmpNode;
470 termNode->antType[i] = rf_control;
471 tmpNode = tmpNode->list_next;
472 }
473 RF_ASSERT(wnpNode->numSuccedents == 1);
474 wnpNode->succedents[0] = termNode;
475 termNode->antecedents[nWndNodes] = wnpNode;
476 termNode->antType[nWndNodes] = rf_control;
477 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
478 if (nfaults == 2) {
479 RF_ASSERT(wnqNode->numSuccedents == 1);
480 wnqNode->succedents[0] = termNode;
481 termNode->antecedents[nWndNodes + 1] = wnqNode;
482 termNode->antType[nWndNodes + 1] = rf_control;
483 }
484 #endif
485 }
486 /******************************************************************************
487 *
488 * creates a DAG to perform a small-write operation (either raid 5 or pq),
489 * which is as follows:
490 *
491 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
492 * \- Rod X / \----> Wnd [Und]-/
493 * [\- Rod X / \---> Wnd [Und]-/]
494 * [\- Roq -> Q / \--> Wnq [Unq]-/]
495 *
496 * Rop = read old parity
497 * Rod = read old data
498 * Roq = read old "q"
499 * Cmt = commit node
500 * Und = unlock data disk
501 * Unp = unlock parity disk
502 * Unq = unlock q disk
503 * Wnp = write new parity
504 * Wnd = write new data
505 * Wnq = write new "q"
506 * [ ] denotes optional segments in the graph
507 *
508 * Parameters: raidPtr - description of the physical array
509 * asmap - logical & physical addresses for this access
510 * bp - buffer ptr (holds write data)
511 * flags - general flags (e.g. disk locking)
512 * allocList - list of memory allocated in DAG creation
513 * pfuncs - list of parity generating functions
514 * qfuncs - list of q generating functions
515 *
516 * A null qfuncs indicates single fault tolerant
517 *****************************************************************************/
518
519 void
520 rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
521 RF_DagHeader_t *dag_h, void *bp,
522 RF_RaidAccessFlags_t flags,
523 RF_AllocListElem_t *allocList,
524 const RF_RedFuncs_t *pfuncs,
525 const RF_RedFuncs_t *qfuncs)
526 {
527 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
528 RF_DagNode_t *tmpNode, *tmpreadDataNode, *tmpreadParityNode;
529 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode;
530 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
531 RF_DagNode_t *tmpxorNode, *tmpqNode, *tmpwriteDataNode, *tmpreadQNode;
532 RF_DagNode_t *tmpwriteParityNode;
533 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
534 RF_DagNode_t *tmpwriteQNode;
535 #endif
536 int i, j, nNodes, totalNumNodes;
537 RF_ReconUnitNum_t which_ru;
538 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
539 int (*qfunc) (RF_DagNode_t *);
540 int numDataNodes, numParityNodes;
541 RF_StripeNum_t parityStripeID;
542 RF_PhysDiskAddr_t *pda;
543 char *name, *qname;
544 long nfaults;
545
546 nfaults = qfuncs ? 2 : 1;
547
548 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
549 asmap->raidAddress, &which_ru);
550 pda = asmap->physInfo;
551 numDataNodes = asmap->numStripeUnitsAccessed;
552 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
553
554 #if RF_DEBUG_DAG
555 if (rf_dagDebug) {
556 printf("[Creating small-write DAG]\n");
557 }
558 #endif
559 RF_ASSERT(numDataNodes > 0);
560 dag_h->creator = "SmallWriteDAG";
561
562 dag_h->numCommitNodes = 1;
563 dag_h->numCommits = 0;
564 dag_h->numSuccedents = 1;
565
566 /*
567 * DAG creation occurs in four steps:
568 * 1. count the number of nodes in the DAG
569 * 2. create the nodes
570 * 3. initialize the nodes
571 * 4. connect the nodes
572 */
573
574 /*
575 * Step 1. compute number of nodes in the graph
576 */
577
578 /* number of nodes: a read and write for each data unit a
579 * redundancy computation node for each parity node (nfaults *
580 * nparity) a read and write for each parity unit a block and
581 * commit node (2) a terminate node if atomic RMW an unlock
582 * node for each data unit, redundancy unit */
583 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
584 + (nfaults * 2 * numParityNodes) + 3;
585 /*
586 * Step 2. create the nodes
587 */
588
589 blockNode = rf_AllocDAGNode();
590 blockNode->list_next = dag_h->nodes;
591 dag_h->nodes = blockNode;
592
593 commitNode = rf_AllocDAGNode();
594 commitNode->list_next = dag_h->nodes;
595 dag_h->nodes = commitNode;
596
597 for (i = 0; i < numDataNodes; i++) {
598 tmpNode = rf_AllocDAGNode();
599 tmpNode->list_next = dag_h->nodes;
600 dag_h->nodes = tmpNode;
601 }
602 readDataNodes = dag_h->nodes;
603
604 for (i = 0; i < numParityNodes; i++) {
605 tmpNode = rf_AllocDAGNode();
606 tmpNode->list_next = dag_h->nodes;
607 dag_h->nodes = tmpNode;
608 }
609 readParityNodes = dag_h->nodes;
610
611 for (i = 0; i < numDataNodes; i++) {
612 tmpNode = rf_AllocDAGNode();
613 tmpNode->list_next = dag_h->nodes;
614 dag_h->nodes = tmpNode;
615 }
616 writeDataNodes = dag_h->nodes;
617
618 for (i = 0; i < numParityNodes; i++) {
619 tmpNode = rf_AllocDAGNode();
620 tmpNode->list_next = dag_h->nodes;
621 dag_h->nodes = tmpNode;
622 }
623 writeParityNodes = dag_h->nodes;
624
625 for (i = 0; i < numParityNodes; i++) {
626 tmpNode = rf_AllocDAGNode();
627 tmpNode->list_next = dag_h->nodes;
628 dag_h->nodes = tmpNode;
629 }
630 xorNodes = dag_h->nodes;
631
632 termNode = rf_AllocDAGNode();
633 termNode->list_next = dag_h->nodes;
634 dag_h->nodes = termNode;
635
636 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
637 if (nfaults == 2) {
638 for (i = 0; i < numParityNodes; i++) {
639 tmpNode = rf_AllocDAGNode();
640 tmpNode->list_next = dag_h->nodes;
641 dag_h->nodes = tmpNode;
642 }
643 readQNodes = dag_h->nodes;
644
645 for (i = 0; i < numParityNodes; i++) {
646 tmpNode = rf_AllocDAGNode();
647 tmpNode->list_next = dag_h->nodes;
648 dag_h->nodes = tmpNode;
649 }
650 writeQNodes = dag_h->nodes;
651
652 for (i = 0; i < numParityNodes; i++) {
653 tmpNode = rf_AllocDAGNode();
654 tmpNode->list_next = dag_h->nodes;
655 dag_h->nodes = tmpNode;
656 }
657 qNodes = dag_h->nodes;
658 } else {
659 #endif
660 readQNodes = writeQNodes = qNodes = NULL;
661 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
662 }
663 #endif
664 RF_ASSERT(i == totalNumNodes);
665
666 /*
667 * Step 3. initialize the nodes
668 */
669 /* initialize block node (Nil) */
670 nNodes = numDataNodes + (nfaults * numParityNodes);
671 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
672 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0,
673 dag_h, "Nil", allocList);
674
675 /* initialize commit node (Cmt) */
676 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
677 rf_NullNodeUndoFunc, NULL, nNodes,
678 (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
679
680 /* initialize terminate node (Trm) */
681 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
682 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0,
683 dag_h, "Trm", allocList);
684
685 /* initialize nodes which read old data (Rod) */
686 tmpreadDataNode = readDataNodes;
687 for (i = 0; i < numDataNodes; i++) {
688 rf_InitNode(tmpreadDataNode, rf_wait, RF_FALSE,
689 rf_DiskReadFunc, rf_DiskReadUndoFunc,
690 rf_GenericWakeupFunc, (nfaults * numParityNodes),
691 1, 4, 0, dag_h, "Rod", allocList);
692 RF_ASSERT(pda != NULL);
693 /* physical disk addr desc */
694 tmpreadDataNode->params[0].p = pda;
695 /* buffer to hold old data */
696 tmpreadDataNode->params[1].p = rf_AllocBuffer(raidPtr, pda->numSector << raidPtr->logBytesPerSector, allocList);
697 tmpreadDataNode->params[2].v = parityStripeID;
698 tmpreadDataNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
699 which_ru);
700 pda = pda->next;
701 for (j = 0; j < tmpreadDataNode->numSuccedents; j++) {
702 tmpreadDataNode->propList[j] = NULL;
703 }
704 tmpreadDataNode = tmpreadDataNode->list_next;
705 }
706
707 /* initialize nodes which read old parity (Rop) */
708 pda = asmap->parityInfo;
709 i = 0;
710 tmpreadParityNode = readParityNodes;
711 for (i = 0; i < numParityNodes; i++) {
712 RF_ASSERT(pda != NULL);
713 rf_InitNode(tmpreadParityNode, rf_wait, RF_FALSE,
714 rf_DiskReadFunc, rf_DiskReadUndoFunc,
715 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0,
716 dag_h, "Rop", allocList);
717 tmpreadParityNode->params[0].p = pda;
718 /* buffer to hold old parity */
719 tmpreadParityNode->params[1].p = rf_AllocBuffer(raidPtr, pda->numSector << raidPtr->logBytesPerSector, allocList);
720 tmpreadParityNode->params[2].v = parityStripeID;
721 tmpreadParityNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
722 which_ru);
723 pda = pda->next;
724 for (j = 0; j < tmpreadParityNode->numSuccedents; j++) {
725 tmpreadParityNode->propList[0] = NULL;
726 }
727 tmpreadParityNode = tmpreadParityNode->list_next;
728 }
729
730 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
731 /* initialize nodes which read old Q (Roq) */
732 if (nfaults == 2) {
733 pda = asmap->qInfo;
734 tmpreadQNode = readQNodes;
735 for (i = 0; i < numParityNodes; i++) {
736 RF_ASSERT(pda != NULL);
737 rf_InitNode(tmpreadQNode, rf_wait, RF_FALSE,
738 rf_DiskReadFunc, rf_DiskReadUndoFunc,
739 rf_GenericWakeupFunc, numParityNodes,
740 1, 4, 0, dag_h, "Roq", allocList);
741 tmpreadQNode->params[0].p = pda;
742 /* buffer to hold old Q */
743 tmpreadQNode->params[1].p = rf_AllocBuffer(raidPtr, dag_h,
744 pda->numSector << raidPtr->logBytesPerSector);
745 tmpreadQNode->params[2].v = parityStripeID;
746 tmpreadQNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
747 which_ru);
748 pda = pda->next;
749 for (j = 0; j < tmpreadQNode->numSuccedents; j++) {
750 tmpreadQNode->propList[0] = NULL;
751 }
752 tmpreadQNode = tmpreadQNode->list_next;
753 }
754 }
755 #endif
756 /* initialize nodes which write new data (Wnd) */
757 pda = asmap->physInfo;
758 tmpwriteDataNode = writeDataNodes;
759 for (i = 0; i < numDataNodes; i++) {
760 RF_ASSERT(pda != NULL);
761 rf_InitNode(tmpwriteDataNode, rf_wait, RF_FALSE,
762 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
763 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
764 "Wnd", allocList);
765 /* physical disk addr desc */
766 tmpwriteDataNode->params[0].p = pda;
767 /* buffer holding new data to be written */
768 tmpwriteDataNode->params[1].p = pda->bufPtr;
769 tmpwriteDataNode->params[2].v = parityStripeID;
770 tmpwriteDataNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
771 which_ru);
772 pda = pda->next;
773 tmpwriteDataNode = tmpwriteDataNode->list_next;
774 }
775
776 /*
777 * Initialize nodes which compute new parity and Q.
778 */
779 /*
780 * We use the simple XOR func in the double-XOR case, and when
781 * we're accessing only a portion of one stripe unit. The
782 * distinction between the two is that the regular XOR func
783 * assumes that the targbuf is a full SU in size, and examines
784 * the pda associated with the buffer to decide where within
785 * the buffer to XOR the data, whereas the simple XOR func
786 * just XORs the data into the start of the buffer. */
787 if ((numParityNodes == 2) || ((numDataNodes == 1)
788 && (asmap->totalSectorsAccessed <
789 raidPtr->Layout.sectorsPerStripeUnit))) {
790 func = pfuncs->simple;
791 undoFunc = rf_NullNodeUndoFunc;
792 name = pfuncs->SimpleName;
793 if (qfuncs) {
794 qfunc = qfuncs->simple;
795 qname = qfuncs->SimpleName;
796 } else {
797 qfunc = NULL;
798 qname = NULL;
799 }
800 } else {
801 func = pfuncs->regular;
802 undoFunc = rf_NullNodeUndoFunc;
803 name = pfuncs->RegularName;
804 if (qfuncs) {
805 qfunc = qfuncs->regular;
806 qname = qfuncs->RegularName;
807 } else {
808 qfunc = NULL;
809 qname = NULL;
810 }
811 }
812 /*
813 * Initialize the xor nodes: params are {pda,buf}
814 * from {Rod,Wnd,Rop} nodes, and raidPtr
815 */
816 if (numParityNodes == 2) {
817 /* double-xor case */
818 tmpxorNode = xorNodes;
819 tmpreadDataNode = readDataNodes;
820 tmpreadParityNode = readParityNodes;
821 tmpwriteDataNode = writeDataNodes;
822 tmpqNode = qNodes;
823 tmpreadQNode = readQNodes;
824 for (i = 0; i < numParityNodes; i++) {
825 /* note: no wakeup func for xor */
826 rf_InitNode(tmpxorNode, rf_wait, RF_FALSE, func,
827 undoFunc, NULL, 1,
828 (numDataNodes + numParityNodes),
829 7, 1, dag_h, name, allocList);
830 tmpxorNode->flags |= RF_DAGNODE_FLAG_YIELD;
831 tmpxorNode->params[0] = tmpreadDataNode->params[0];
832 tmpxorNode->params[1] = tmpreadDataNode->params[1];
833 tmpxorNode->params[2] = tmpreadParityNode->params[0];
834 tmpxorNode->params[3] = tmpreadParityNode->params[1];
835 tmpxorNode->params[4] = tmpwriteDataNode->params[0];
836 tmpxorNode->params[5] = tmpwriteDataNode->params[1];
837 tmpxorNode->params[6].p = raidPtr;
838 /* use old parity buf as target buf */
839 tmpxorNode->results[0] = tmpreadParityNode->params[1].p;
840 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
841 if (nfaults == 2) {
842 /* note: no wakeup func for qor */
843 rf_InitNode(tmpqNode, rf_wait, RF_FALSE,
844 qfunc, undoFunc, NULL, 1,
845 (numDataNodes + numParityNodes),
846 7, 1, dag_h, qname, allocList);
847 tmpqNode->params[0] = tmpreadDataNode->params[0];
848 tmpqNode->params[1] = tmpreadDataNode->params[1];
849 tmpqNode->params[2] = tmpreadQNode->.params[0];
850 tmpqNode->params[3] = tmpreadQNode->params[1];
851 tmpqNode->params[4] = tmpwriteDataNode->params[0];
852 tmpqNode->params[5] = tmpwriteDataNode->params[1];
853 tmpqNode->params[6].p = raidPtr;
854 /* use old Q buf as target buf */
855 tmpqNode->results[0] = tmpreadQNode->params[1].p;
856 tmpqNode = tmpqNode->list_next;
857 tmpreadQNodes = tmpreadQNodes->list_next;
858 }
859 #endif
860 tmpxorNode = tmpxorNode->list_next;
861 tmpreadDataNode = tmpreadDataNode->list_next;
862 tmpreadParityNode = tmpreadParityNode->list_next;
863 tmpwriteDataNode = tmpwriteDataNode->list_next;
864 }
865 } else {
866 /* there is only one xor node in this case */
867 rf_InitNode(xorNodes, rf_wait, RF_FALSE, func,
868 undoFunc, NULL, 1, (numDataNodes + numParityNodes),
869 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
870 dag_h, name, allocList);
871 xorNodes->flags |= RF_DAGNODE_FLAG_YIELD;
872 tmpreadDataNode = readDataNodes;
873 for (i = 0; i < numDataNodes; i++) { /* used to be"numDataNodes + 1" until we factored
874 out the "+1" into the "deal with Rop separately below */
875 /* set up params related to Rod nodes */
876 xorNodes->params[2 * i + 0] = tmpreadDataNode->params[0]; /* pda */
877 xorNodes->params[2 * i + 1] = tmpreadDataNode->params[1]; /* buffer ptr */
878 tmpreadDataNode = tmpreadDataNode->list_next;
879 }
880 /* deal with Rop separately */
881 xorNodes->params[2 * numDataNodes + 0] = readParityNodes->params[0]; /* pda */
882 xorNodes->params[2 * numDataNodes + 1] = readParityNodes->params[1]; /* buffer ptr */
883
884 tmpwriteDataNode = writeDataNodes;
885 for (i = 0; i < numDataNodes; i++) {
886 /* set up params related to Wnd and Wnp nodes */
887 xorNodes->params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
888 tmpwriteDataNode->params[0];
889 xorNodes->params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
890 tmpwriteDataNode->params[1];
891 tmpwriteDataNode = tmpwriteDataNode->list_next;
892 }
893 /* xor node needs to get at RAID information */
894 xorNodes->params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
895 xorNodes->results[0] = readParityNodes->params[1].p;
896 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
897 if (nfaults == 2) {
898 rf_InitNode(qNodes, rf_wait, RF_FALSE, qfunc,
899 undoFunc, NULL, 1,
900 (numDataNodes + numParityNodes),
901 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
902 dag_h, qname, allocList);
903 tmpreadDataNode = readDataNodes;
904 for (i = 0; i < numDataNodes; i++) {
905 /* set up params related to Rod */
906 qNodes->params[2 * i + 0] = tmpreadDataNode->params[0]; /* pda */
907 qNodes->params[2 * i + 1] = tmpreadDataNode->params[1]; /* buffer ptr */
908 tmpreadDataNode = tmpreadDataNode->list_next;
909 }
910 /* and read old q */
911 qNodes->params[2 * numDataNodes + 0] = /* pda */
912 readQNodes->params[0];
913 qNodes->params[2 * numDataNodes + 1] = /* buffer ptr */
914 readQNodes->params[1];
915 tmpwriteDataNode = writeDataNodes;
916 for (i = 0; i < numDataNodes; i++) {
917 /* set up params related to Wnd nodes */
918 qNodes->params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
919 tmpwriteDataNode->params[0];
920 qNodes->params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
921 tmpwriteDataNode->params[1];
922 tmpwriteDataNode = tmpwriteDataNode->list_next;
923 }
924 /* xor node needs to get at RAID information */
925 qNodes->params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
926 qNodes->results[0] = readQNodes->params[1].p;
927 }
928 #endif
929 }
930
931 /* initialize nodes which write new parity (Wnp) */
932 pda = asmap->parityInfo;
933 tmpwriteParityNode = writeParityNodes;
934 tmpxorNode = xorNodes;
935 for (i = 0; i < numParityNodes; i++) {
936 rf_InitNode(tmpwriteParityNode, rf_wait, RF_FALSE,
937 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
938 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
939 "Wnp", allocList);
940 RF_ASSERT(pda != NULL);
941 tmpwriteParityNode->params[0].p = pda; /* param 1 (bufPtr)
942 * filled in by xor node */
943 tmpwriteParityNode->params[1].p = tmpxorNode->results[0]; /* buffer pointer for
944 * parity write
945 * operation */
946 tmpwriteParityNode->params[2].v = parityStripeID;
947 tmpwriteParityNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
948 which_ru);
949 pda = pda->next;
950 tmpwriteParityNode = tmpwriteParityNode->list_next;
951 tmpxorNode = tmpxorNode->list_next;
952 }
953
954 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
955 /* initialize nodes which write new Q (Wnq) */
956 if (nfaults == 2) {
957 pda = asmap->qInfo;
958 tmpwriteQNode = writeQNodes;
959 tmpqNode = qNodes;
960 for (i = 0; i < numParityNodes; i++) {
961 rf_InitNode(tmpwriteQNode, rf_wait, RF_FALSE,
962 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
963 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
964 "Wnq", allocList);
965 RF_ASSERT(pda != NULL);
966 tmpwriteQNode->params[0].p = pda; /* param 1 (bufPtr)
967 * filled in by xor node */
968 tmpwriteQNode->params[1].p = tmpqNode->results[0]; /* buffer pointer for
969 * parity write
970 * operation */
971 tmpwriteQNode->params[2].v = parityStripeID;
972 tmpwriteQNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
973 which_ru);
974 pda = pda->next;
975 tmpwriteQNode = tmpwriteQNode->list_next;
976 tmpqNode = tmpqNode->list_next;
977 }
978 }
979 #endif
980 /*
981 * Step 4. connect the nodes.
982 */
983
984 /* connect header to block node */
985 dag_h->succedents[0] = blockNode;
986
987 /* connect block node to read old data nodes */
988 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
989 tmpreadDataNode = readDataNodes;
990 for (i = 0; i < numDataNodes; i++) {
991 blockNode->succedents[i] = tmpreadDataNode;
992 RF_ASSERT(tmpreadDataNode->numAntecedents == 1);
993 tmpreadDataNode->antecedents[0] = blockNode;
994 tmpreadDataNode->antType[0] = rf_control;
995 tmpreadDataNode = tmpreadDataNode->list_next;
996 }
997
998 /* connect block node to read old parity nodes */
999 tmpreadParityNode = readParityNodes;
1000 for (i = 0; i < numParityNodes; i++) {
1001 blockNode->succedents[numDataNodes + i] = tmpreadParityNode;
1002 RF_ASSERT(tmpreadParityNode->numAntecedents == 1);
1003 tmpreadParityNode->antecedents[0] = blockNode;
1004 tmpreadParityNode->antType[0] = rf_control;
1005 tmpreadParityNode = tmpreadParityNode->list_next;
1006 }
1007
1008 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
1009 /* connect block node to read old Q nodes */
1010 if (nfaults == 2) {
1011 tmpreadQNode = readQNodes;
1012 for (i = 0; i < numParityNodes; i++) {
1013 blockNode->succedents[numDataNodes + numParityNodes + i] = tmpreadQNode;
1014 RF_ASSERT(tmpreadQNode->numAntecedents == 1);
1015 tmpreadQNode->antecedents[0] = blockNode;
1016 tmpreadQNode->antType[0] = rf_control;
1017 tmpreadQNode = tmpreadQNode->list_next;
1018 }
1019 }
1020 #endif
1021 /* connect read old data nodes to xor nodes */
1022 tmpreadDataNode = readDataNodes;
1023 for (i = 0; i < numDataNodes; i++) {
1024 RF_ASSERT(tmpreadDataNode->numSuccedents == (nfaults * numParityNodes));
1025 tmpxorNode = xorNodes;
1026 for (j = 0; j < numParityNodes; j++) {
1027 RF_ASSERT(tmpxorNode->numAntecedents == numDataNodes + numParityNodes);
1028 tmpreadDataNode->succedents[j] = tmpxorNode;
1029 tmpxorNode->antecedents[i] = tmpreadDataNode;
1030 tmpxorNode->antType[i] = rf_trueData;
1031 tmpxorNode = tmpxorNode->list_next;
1032 }
1033 tmpreadDataNode = tmpreadDataNode->list_next;
1034 }
1035
1036 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
1037 /* connect read old data nodes to q nodes */
1038 if (nfaults == 2) {
1039 tmpreadDataNode = readDataNodes;
1040 for (i = 0; i < numDataNodes; i++) {
1041 tmpqNode = qNodes;
1042 for (j = 0; j < numParityNodes; j++) {
1043 RF_ASSERT(tmpqNode->numAntecedents == numDataNodes + numParityNodes);
1044 tmpreadDataNode->succedents[numParityNodes + j] = tmpqNode;
1045 tmpqNode->antecedents[i] = tmpreadDataNode;
1046 tmpqNode->antType[i] = rf_trueData;
1047 tmpqNode = tmpqNode->list_next;
1048 }
1049 tmpreadDataNode = tmpreadDataNode->list_next;
1050 }
1051 }
1052 #endif
1053 /* connect read old parity nodes to xor nodes */
1054 tmpreadParityNode = readParityNodes;
1055 for (i = 0; i < numParityNodes; i++) {
1056 RF_ASSERT(tmpreadParityNode->numSuccedents == numParityNodes);
1057 tmpxorNode = xorNodes;
1058 for (j = 0; j < numParityNodes; j++) {
1059 tmpreadParityNode->succedents[j] = tmpxorNode;
1060 tmpxorNode->antecedents[numDataNodes + i] = tmpreadParityNode;
1061 tmpxorNode->antType[numDataNodes + i] = rf_trueData;
1062 tmpxorNode = tmpxorNode->list_next;
1063 }
1064 tmpreadParityNode = tmpreadParityNode->list_next;
1065 }
1066
1067 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
1068 /* connect read old q nodes to q nodes */
1069 if (nfaults == 2) {
1070 tmpreadParityNode = readParityNodes;
1071 tmpreadQNode = readQNodes;
1072 for (i = 0; i < numParityNodes; i++) {
1073 RF_ASSERT(tmpreadParityNode->numSuccedents == numParityNodes);
1074 tmpqNode = qNodes;
1075 for (j = 0; j < numParityNodes; j++) {
1076 tmpreadQNode->succedents[j] = tmpqNode;
1077 tmpqNode->antecedents[numDataNodes + i] = tmpreadQNodes;
1078 tmpqNode->antType[numDataNodes + i] = rf_trueData;
1079 tmpqNode = tmpqNode->list_next;
1080 }
1081 tmpreadParityNode = tmpreadParityNode->list_next;
1082 tmpreadQNode = tmpreadQNode->list_next;
1083 }
1084 }
1085 #endif
1086 /* connect xor nodes to commit node */
1087 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
1088 tmpxorNode = xorNodes;
1089 for (i = 0; i < numParityNodes; i++) {
1090 RF_ASSERT(tmpxorNode->numSuccedents == 1);
1091 tmpxorNode->succedents[0] = commitNode;
1092 commitNode->antecedents[i] = tmpxorNode;
1093 commitNode->antType[i] = rf_control;
1094 tmpxorNode = tmpxorNode->list_next;
1095 }
1096
1097 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
1098 /* connect q nodes to commit node */
1099 if (nfaults == 2) {
1100 tmpqNode = qNodes;
1101 for (i = 0; i < numParityNodes; i++) {
1102 RF_ASSERT(tmpqNode->numSuccedents == 1);
1103 tmpqNode->succedents[0] = commitNode;
1104 commitNode->antecedents[i + numParityNodes] = tmpqNode;
1105 commitNode->antType[i + numParityNodes] = rf_control;
1106 tmpqNode = tmpqNode->list_next;
1107 }
1108 }
1109 #endif
1110 /* connect commit node to write nodes */
1111 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
1112 tmpwriteDataNode = writeDataNodes;
1113 for (i = 0; i < numDataNodes; i++) {
1114 RF_ASSERT(tmpwriteDataNodes->numAntecedents == 1);
1115 commitNode->succedents[i] = tmpwriteDataNode;
1116 tmpwriteDataNode->antecedents[0] = commitNode;
1117 tmpwriteDataNode->antType[0] = rf_trueData;
1118 tmpwriteDataNode = tmpwriteDataNode->list_next;
1119 }
1120 tmpwriteParityNode = writeParityNodes;
1121 for (i = 0; i < numParityNodes; i++) {
1122 RF_ASSERT(tmpwriteParityNode->numAntecedents == 1);
1123 commitNode->succedents[i + numDataNodes] = tmpwriteParityNode;
1124 tmpwriteParityNode->antecedents[0] = commitNode;
1125 tmpwriteParityNode->antType[0] = rf_trueData;
1126 tmpwriteParityNode = tmpwriteParityNode->list_next;
1127 }
1128 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
1129 if (nfaults == 2) {
1130 tmpwriteQNode = writeQNodes;
1131 for (i = 0; i < numParityNodes; i++) {
1132 RF_ASSERT(tmpwriteQNode->numAntecedents == 1);
1133 commitNode->succedents[i + numDataNodes + numParityNodes] = tmpwriteQNode;
1134 tmpwriteQNode->antecedents[0] = commitNode;
1135 tmpwriteQNode->antType[0] = rf_trueData;
1136 tmpwriteQNode = tmpwriteQNode->list_next;
1137 }
1138 }
1139 #endif
1140 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1141 RF_ASSERT(termNode->numSuccedents == 0);
1142 tmpwriteDataNode = writeDataNodes;
1143 for (i = 0; i < numDataNodes; i++) {
1144 /* connect write new data nodes to term node */
1145 RF_ASSERT(tmpwriteDataNode->numSuccedents == 1);
1146 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1147 tmpwriteDataNode->succedents[0] = termNode;
1148 termNode->antecedents[i] = tmpwriteDataNode;
1149 termNode->antType[i] = rf_control;
1150 tmpwriteDataNode = tmpwriteDataNode->list_next;
1151 }
1152
1153 tmpwriteParityNode = writeParityNodes;
1154 for (i = 0; i < numParityNodes; i++) {
1155 RF_ASSERT(tmpwriteParityNode->numSuccedents == 1);
1156 tmpwriteParityNode->succedents[0] = termNode;
1157 termNode->antecedents[numDataNodes + i] = tmpwriteParityNode;
1158 termNode->antType[numDataNodes + i] = rf_control;
1159 tmpwriteParityNode = tmpwriteParityNode->list_next;
1160 }
1161
1162 #if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0)
1163 if (nfaults == 2) {
1164 tmpwriteQNode = writeQNodes;
1165 for (i = 0; i < numParityNodes; i++) {
1166 RF_ASSERT(tmpwriteQNode->numSuccedents == 1);
1167 tmpwriteQNode->succedents[0] = termNode;
1168 termNode->antecedents[numDataNodes + numParityNodes + i] = tmpwriteQNode;
1169 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1170 tmpwriteQNode = tmpwriteQNode->list_next;
1171 }
1172 }
1173 #endif
1174 }
1175
1176
1177 /******************************************************************************
1178 * create a write graph (fault-free or degraded) for RAID level 1
1179 *
1180 * Hdr -> Commit -> Wpd -> Nil -> Trm
1181 * -> Wsd ->
1182 *
1183 * The "Wpd" node writes data to the primary copy in the mirror pair
1184 * The "Wsd" node writes data to the secondary copy in the mirror pair
1185 *
1186 * Parameters: raidPtr - description of the physical array
1187 * asmap - logical & physical addresses for this access
1188 * bp - buffer ptr (holds write data)
1189 * flags - general flags (e.g. disk locking)
1190 * allocList - list of memory allocated in DAG creation
1191 *****************************************************************************/
1192
1193 void
1194 rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1195 RF_DagHeader_t *dag_h, void *bp,
1196 RF_RaidAccessFlags_t flags,
1197 RF_AllocListElem_t *allocList)
1198 {
1199 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1200 RF_DagNode_t *wndNode, *wmirNode;
1201 RF_DagNode_t *tmpNode, *tmpwndNode, *tmpwmirNode;
1202 int nWndNodes, nWmirNodes, i;
1203 RF_ReconUnitNum_t which_ru;
1204 RF_PhysDiskAddr_t *pda, *pdaP;
1205 RF_StripeNum_t parityStripeID;
1206
1207 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1208 asmap->raidAddress, &which_ru);
1209 #if RF_DEBUG_DAG
1210 if (rf_dagDebug) {
1211 printf("[Creating RAID level 1 write DAG]\n");
1212 }
1213 #endif
1214 dag_h->creator = "RaidOneWriteDAG";
1215
1216 /* 2 implies access not SU aligned */
1217 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1218 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1219
1220 /* alloc the Wnd nodes and the Wmir node */
1221 if (asmap->numDataFailed == 1)
1222 nWndNodes--;
1223 if (asmap->numParityFailed == 1)
1224 nWmirNodes--;
1225
1226 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock
1227 * + terminator) */
1228 for (i = 0; i < nWndNodes; i++) {
1229 tmpNode = rf_AllocDAGNode();
1230 tmpNode->list_next = dag_h->nodes;
1231 dag_h->nodes = tmpNode;
1232 }
1233 wndNode = dag_h->nodes;
1234
1235 for (i = 0; i < nWmirNodes; i++) {
1236 tmpNode = rf_AllocDAGNode();
1237 tmpNode->list_next = dag_h->nodes;
1238 dag_h->nodes = tmpNode;
1239 }
1240 wmirNode = dag_h->nodes;
1241
1242 commitNode = rf_AllocDAGNode();
1243 commitNode->list_next = dag_h->nodes;
1244 dag_h->nodes = commitNode;
1245
1246 unblockNode = rf_AllocDAGNode();
1247 unblockNode->list_next = dag_h->nodes;
1248 dag_h->nodes = unblockNode;
1249
1250 termNode = rf_AllocDAGNode();
1251 termNode->list_next = dag_h->nodes;
1252 dag_h->nodes = termNode;
1253
1254 /* this dag can commit immediately */
1255 dag_h->numCommitNodes = 1;
1256 dag_h->numCommits = 0;
1257 dag_h->numSuccedents = 1;
1258
1259 /* initialize the commit, unblock, and term nodes */
1260 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
1261 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes),
1262 0, 0, 0, dag_h, "Cmt", allocList);
1263 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1264 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes),
1265 0, 0, dag_h, "Nil", allocList);
1266 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1267 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0,
1268 dag_h, "Trm", allocList);
1269
1270 /* initialize the wnd nodes */
1271 if (nWndNodes > 0) {
1272 pda = asmap->physInfo;
1273 tmpwndNode = wndNode;
1274 for (i = 0; i < nWndNodes; i++) {
1275 rf_InitNode(tmpwndNode, rf_wait, RF_FALSE,
1276 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1277 rf_GenericWakeupFunc, 1, 1, 4, 0,
1278 dag_h, "Wpd", allocList);
1279 RF_ASSERT(pda != NULL);
1280 tmpwndNode->params[0].p = pda;
1281 tmpwndNode->params[1].p = pda->bufPtr;
1282 tmpwndNode->params[2].v = parityStripeID;
1283 tmpwndNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
1284 pda = pda->next;
1285 tmpwndNode = tmpwndNode->list_next;
1286 }
1287 RF_ASSERT(pda == NULL);
1288 }
1289 /* initialize the mirror nodes */
1290 if (nWmirNodes > 0) {
1291 pda = asmap->physInfo;
1292 pdaP = asmap->parityInfo;
1293 tmpwmirNode = wmirNode;
1294 for (i = 0; i < nWmirNodes; i++) {
1295 rf_InitNode(tmpwmirNode, rf_wait, RF_FALSE,
1296 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1297 rf_GenericWakeupFunc, 1, 1, 4, 0,
1298 dag_h, "Wsd", allocList);
1299 RF_ASSERT(pda != NULL);
1300 tmpwmirNode->params[0].p = pdaP;
1301 tmpwmirNode->params[1].p = pda->bufPtr;
1302 tmpwmirNode->params[2].v = parityStripeID;
1303 tmpwmirNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru);
1304 pda = pda->next;
1305 pdaP = pdaP->next;
1306 tmpwmirNode = tmpwmirNode->list_next;
1307 }
1308 RF_ASSERT(pda == NULL);
1309 RF_ASSERT(pdaP == NULL);
1310 }
1311 /* link the header node to the commit node */
1312 RF_ASSERT(dag_h->numSuccedents == 1);
1313 RF_ASSERT(commitNode->numAntecedents == 0);
1314 dag_h->succedents[0] = commitNode;
1315
1316 /* link the commit node to the write nodes */
1317 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1318 tmpwndNode = wndNode;
1319 for (i = 0; i < nWndNodes; i++) {
1320 RF_ASSERT(tmpwndNode->numAntecedents == 1);
1321 commitNode->succedents[i] = tmpwndNode;
1322 tmpwndNode->antecedents[0] = commitNode;
1323 tmpwndNode->antType[0] = rf_control;
1324 tmpwndNode = tmpwndNode->list_next;
1325 }
1326 tmpwmirNode = wmirNode;
1327 for (i = 0; i < nWmirNodes; i++) {
1328 RF_ASSERT(tmpwmirNode->numAntecedents == 1);
1329 commitNode->succedents[i + nWndNodes] = tmpwmirNode;
1330 tmpwmirNode->antecedents[0] = commitNode;
1331 tmpwmirNode->antType[0] = rf_control;
1332 tmpwmirNode = tmpwmirNode->list_next;
1333 }
1334
1335 /* link the write nodes to the unblock node */
1336 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1337 tmpwndNode = wndNode;
1338 for (i = 0; i < nWndNodes; i++) {
1339 RF_ASSERT(tmpwndNode->numSuccedents == 1);
1340 tmpwndNode->succedents[0] = unblockNode;
1341 unblockNode->antecedents[i] = tmpwndNode;
1342 unblockNode->antType[i] = rf_control;
1343 tmpwndNode = tmpwndNode->list_next;
1344 }
1345 tmpwmirNode = wmirNode;
1346 for (i = 0; i < nWmirNodes; i++) {
1347 RF_ASSERT(tmpwmirNode->numSuccedents == 1);
1348 tmpwmirNode->succedents[0] = unblockNode;
1349 unblockNode->antecedents[i + nWndNodes] = tmpwmirNode;
1350 unblockNode->antType[i + nWndNodes] = rf_control;
1351 tmpwmirNode = tmpwmirNode->list_next;
1352 }
1353
1354 /* link the unblock node to the term node */
1355 RF_ASSERT(unblockNode->numSuccedents == 1);
1356 RF_ASSERT(termNode->numAntecedents == 1);
1357 RF_ASSERT(termNode->numSuccedents == 0);
1358 unblockNode->succedents[0] = termNode;
1359 termNode->antecedents[0] = unblockNode;
1360 termNode->antType[0] = rf_control;
1361 }
1362