rf_dagffwr.c revision 1.3 1 /* $NetBSD: rf_dagffwr.c,v 1.3 1999/02/05 00:06:07 oster Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /*
30 * rf_dagff.c
31 *
32 * code for creating fault-free DAGs
33 *
34 */
35
36 #include "rf_types.h"
37 #include "rf_raid.h"
38 #include "rf_dag.h"
39 #include "rf_dagutils.h"
40 #include "rf_dagfuncs.h"
41 #include "rf_threadid.h"
42 #include "rf_debugMem.h"
43 #include "rf_dagffrd.h"
44 #include "rf_memchunk.h"
45 #include "rf_general.h"
46 #include "rf_dagffwr.h"
47
48 /******************************************************************************
49 *
50 * General comments on DAG creation:
51 *
52 * All DAGs in this file use roll-away error recovery. Each DAG has a single
53 * commit node, usually called "Cmt." If an error occurs before the Cmt node
54 * is reached, the execution engine will halt forward execution and work
55 * backward through the graph, executing the undo functions. Assuming that
56 * each node in the graph prior to the Cmt node are undoable and atomic - or -
57 * does not make changes to permanent state, the graph will fail atomically.
58 * If an error occurs after the Cmt node executes, the engine will roll-forward
59 * through the graph, blindly executing nodes until it reaches the end.
60 * If a graph reaches the end, it is assumed to have completed successfully.
61 *
62 * A graph has only 1 Cmt node.
63 *
64 */
65
66
67 /******************************************************************************
68 *
69 * The following wrappers map the standard DAG creation interface to the
70 * DAG creation routines. Additionally, these wrappers enable experimentation
71 * with new DAG structures by providing an extra level of indirection, allowing
72 * the DAG creation routines to be replaced at this single point.
73 */
74
75
76 void
77 rf_CreateNonRedundantWriteDAG(
78 RF_Raid_t * raidPtr,
79 RF_AccessStripeMap_t * asmap,
80 RF_DagHeader_t * dag_h,
81 void *bp,
82 RF_RaidAccessFlags_t flags,
83 RF_AllocListElem_t * allocList,
84 RF_IoType_t type)
85 {
86 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
87 RF_IO_TYPE_WRITE);
88 }
89
90 void
91 rf_CreateRAID0WriteDAG(
92 RF_Raid_t * raidPtr,
93 RF_AccessStripeMap_t * asmap,
94 RF_DagHeader_t * dag_h,
95 void *bp,
96 RF_RaidAccessFlags_t flags,
97 RF_AllocListElem_t * allocList,
98 RF_IoType_t type)
99 {
100 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
101 RF_IO_TYPE_WRITE);
102 }
103
104 void
105 rf_CreateSmallWriteDAG(
106 RF_Raid_t * raidPtr,
107 RF_AccessStripeMap_t * asmap,
108 RF_DagHeader_t * dag_h,
109 void *bp,
110 RF_RaidAccessFlags_t flags,
111 RF_AllocListElem_t * allocList)
112 {
113 #if RF_FORWARD > 0
114 rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
115 &rf_xorFuncs, NULL);
116 #else /* RF_FORWARD > 0 */
117 #if RF_BACKWARD > 0
118 rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
119 &rf_xorFuncs, NULL);
120 #else /* RF_BACKWARD > 0 */
121 /* "normal" rollaway */
122 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
123 &rf_xorFuncs, NULL);
124 #endif /* RF_BACKWARD > 0 */
125 #endif /* RF_FORWARD > 0 */
126 }
127
128 void
129 rf_CreateLargeWriteDAG(
130 RF_Raid_t * raidPtr,
131 RF_AccessStripeMap_t * asmap,
132 RF_DagHeader_t * dag_h,
133 void *bp,
134 RF_RaidAccessFlags_t flags,
135 RF_AllocListElem_t * allocList)
136 {
137 #if RF_FORWARD > 0
138 rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
139 1, rf_RegularXorFunc, RF_TRUE);
140 #else /* RF_FORWARD > 0 */
141 #if RF_BACKWARD > 0
142 rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
143 1, rf_RegularXorFunc, RF_TRUE);
144 #else /* RF_BACKWARD > 0 */
145 /* "normal" rollaway */
146 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
147 1, rf_RegularXorFunc, RF_TRUE);
148 #endif /* RF_BACKWARD > 0 */
149 #endif /* RF_FORWARD > 0 */
150 }
151
152
153 /******************************************************************************
154 *
155 * DAG creation code begins here
156 */
157
158
159 /******************************************************************************
160 *
161 * creates a DAG to perform a large-write operation:
162 *
163 * / Rod \ / Wnd \
164 * H -- block- Rod - Xor - Cmt - Wnd --- T
165 * \ Rod / \ Wnp /
166 * \[Wnq]/
167 *
168 * The XOR node also does the Q calculation in the P+Q architecture.
169 * All nodes are before the commit node (Cmt) are assumed to be atomic and
170 * undoable - or - they make no changes to permanent state.
171 *
172 * Rod = read old data
173 * Cmt = commit node
174 * Wnp = write new parity
175 * Wnd = write new data
176 * Wnq = write new "q"
177 * [] denotes optional segments in the graph
178 *
179 * Parameters: raidPtr - description of the physical array
180 * asmap - logical & physical addresses for this access
181 * bp - buffer ptr (holds write data)
182 * flags - general flags (e.g. disk locking)
183 * allocList - list of memory allocated in DAG creation
184 * nfaults - number of faults array can tolerate
185 * (equal to # redundancy units in stripe)
186 * redfuncs - list of redundancy generating functions
187 *
188 *****************************************************************************/
189
190 void
191 rf_CommonCreateLargeWriteDAG(
192 RF_Raid_t * raidPtr,
193 RF_AccessStripeMap_t * asmap,
194 RF_DagHeader_t * dag_h,
195 void *bp,
196 RF_RaidAccessFlags_t flags,
197 RF_AllocListElem_t * allocList,
198 int nfaults,
199 int (*redFunc) (RF_DagNode_t *),
200 int allowBufferRecycle)
201 {
202 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
203 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
204 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
205 RF_AccessStripeMapHeader_t *new_asm_h[2];
206 RF_StripeNum_t parityStripeID;
207 char *sosBuffer, *eosBuffer;
208 RF_ReconUnitNum_t which_ru;
209 RF_RaidLayout_t *layoutPtr;
210 RF_PhysDiskAddr_t *pda;
211
212 layoutPtr = &(raidPtr->Layout);
213 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress,
214 &which_ru);
215
216 if (rf_dagDebug) {
217 printf("[Creating large-write DAG]\n");
218 }
219 dag_h->creator = "LargeWriteDAG";
220
221 dag_h->numCommitNodes = 1;
222 dag_h->numCommits = 0;
223 dag_h->numSuccedents = 1;
224
225 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
226 nWndNodes = asmap->numStripeUnitsAccessed;
227 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
228 (RF_DagNode_t *), allocList);
229 i = 0;
230 wndNodes = &nodes[i];
231 i += nWndNodes;
232 xorNode = &nodes[i];
233 i += 1;
234 wnpNode = &nodes[i];
235 i += 1;
236 blockNode = &nodes[i];
237 i += 1;
238 commitNode = &nodes[i];
239 i += 1;
240 termNode = &nodes[i];
241 i += 1;
242 if (nfaults == 2) {
243 wnqNode = &nodes[i];
244 i += 1;
245 } else {
246 wnqNode = NULL;
247 }
248 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h,
249 &nRodNodes, &sosBuffer, &eosBuffer, allocList);
250 if (nRodNodes > 0) {
251 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
252 (RF_DagNode_t *), allocList);
253 } else {
254 rodNodes = NULL;
255 }
256
257 /* begin node initialization */
258 if (nRodNodes > 0) {
259 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
260 NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
261 } else {
262 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
263 NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
264 }
265
266 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL,
267 nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList);
268 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL,
269 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
270
271 /* initialize the Rod nodes */
272 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
273 if (new_asm_h[asmNum]) {
274 pda = new_asm_h[asmNum]->stripeMap->physInfo;
275 while (pda) {
276 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,
277 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
278 "Rod", allocList);
279 rodNodes[nodeNum].params[0].p = pda;
280 rodNodes[nodeNum].params[1].p = pda->bufPtr;
281 rodNodes[nodeNum].params[2].v = parityStripeID;
282 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
283 0, 0, which_ru);
284 nodeNum++;
285 pda = pda->next;
286 }
287 }
288 }
289 RF_ASSERT(nodeNum == nRodNodes);
290
291 /* initialize the wnd nodes */
292 pda = asmap->physInfo;
293 for (i = 0; i < nWndNodes; i++) {
294 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
295 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
296 RF_ASSERT(pda != NULL);
297 wndNodes[i].params[0].p = pda;
298 wndNodes[i].params[1].p = pda->bufPtr;
299 wndNodes[i].params[2].v = parityStripeID;
300 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
301 pda = pda->next;
302 }
303
304 /* initialize the redundancy node */
305 if (nRodNodes > 0) {
306 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
307 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
308 "Xr ", allocList);
309 } else {
310 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
311 1, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
312 }
313 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
314 for (i = 0; i < nWndNodes; i++) {
315 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */
316 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */
317 }
318 for (i = 0; i < nRodNodes; i++) {
319 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */
320 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */
321 }
322 /* xor node needs to get at RAID information */
323 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
324
325 /*
326 * Look for an Rod node that reads a complete SU. If none, alloc a buffer
327 * to receive the parity info. Note that we can't use a new data buffer
328 * because it will not have gotten written when the xor occurs.
329 */
330 if (allowBufferRecycle) {
331 for (i = 0; i < nRodNodes; i++) {
332 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
333 break;
334 }
335 }
336 if ((!allowBufferRecycle) || (i == nRodNodes)) {
337 RF_CallocAndAdd(xorNode->results[0], 1,
338 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
339 (void *), allocList);
340 } else {
341 xorNode->results[0] = rodNodes[i].params[1].p;
342 }
343
344 /* initialize the Wnp node */
345 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
346 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
347 wnpNode->params[0].p = asmap->parityInfo;
348 wnpNode->params[1].p = xorNode->results[0];
349 wnpNode->params[2].v = parityStripeID;
350 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
351 /* parityInfo must describe entire parity unit */
352 RF_ASSERT(asmap->parityInfo->next == NULL);
353
354 if (nfaults == 2) {
355 /*
356 * We never try to recycle a buffer for the Q calcuation
357 * in addition to the parity. This would cause two buffers
358 * to get smashed during the P and Q calculation, guaranteeing
359 * one would be wrong.
360 */
361 RF_CallocAndAdd(xorNode->results[1], 1,
362 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
363 (void *), allocList);
364 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
365 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
366 wnqNode->params[0].p = asmap->qInfo;
367 wnqNode->params[1].p = xorNode->results[1];
368 wnqNode->params[2].v = parityStripeID;
369 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
370 /* parityInfo must describe entire parity unit */
371 RF_ASSERT(asmap->parityInfo->next == NULL);
372 }
373 /*
374 * Connect nodes to form graph.
375 */
376
377 /* connect dag header to block node */
378 RF_ASSERT(blockNode->numAntecedents == 0);
379 dag_h->succedents[0] = blockNode;
380
381 if (nRodNodes > 0) {
382 /* connect the block node to the Rod nodes */
383 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
384 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
385 for (i = 0; i < nRodNodes; i++) {
386 RF_ASSERT(rodNodes[i].numAntecedents == 1);
387 blockNode->succedents[i] = &rodNodes[i];
388 rodNodes[i].antecedents[0] = blockNode;
389 rodNodes[i].antType[0] = rf_control;
390
391 /* connect the Rod nodes to the Xor node */
392 RF_ASSERT(rodNodes[i].numSuccedents == 1);
393 rodNodes[i].succedents[0] = xorNode;
394 xorNode->antecedents[i] = &rodNodes[i];
395 xorNode->antType[i] = rf_trueData;
396 }
397 } else {
398 /* connect the block node to the Xor node */
399 RF_ASSERT(blockNode->numSuccedents == 1);
400 RF_ASSERT(xorNode->numAntecedents == 1);
401 blockNode->succedents[0] = xorNode;
402 xorNode->antecedents[0] = blockNode;
403 xorNode->antType[0] = rf_control;
404 }
405
406 /* connect the xor node to the commit node */
407 RF_ASSERT(xorNode->numSuccedents == 1);
408 RF_ASSERT(commitNode->numAntecedents == 1);
409 xorNode->succedents[0] = commitNode;
410 commitNode->antecedents[0] = xorNode;
411 commitNode->antType[0] = rf_control;
412
413 /* connect the commit node to the write nodes */
414 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
415 for (i = 0; i < nWndNodes; i++) {
416 RF_ASSERT(wndNodes->numAntecedents == 1);
417 commitNode->succedents[i] = &wndNodes[i];
418 wndNodes[i].antecedents[0] = commitNode;
419 wndNodes[i].antType[0] = rf_control;
420 }
421 RF_ASSERT(wnpNode->numAntecedents == 1);
422 commitNode->succedents[nWndNodes] = wnpNode;
423 wnpNode->antecedents[0] = commitNode;
424 wnpNode->antType[0] = rf_trueData;
425 if (nfaults == 2) {
426 RF_ASSERT(wnqNode->numAntecedents == 1);
427 commitNode->succedents[nWndNodes + 1] = wnqNode;
428 wnqNode->antecedents[0] = commitNode;
429 wnqNode->antType[0] = rf_trueData;
430 }
431 /* connect the write nodes to the term node */
432 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
433 RF_ASSERT(termNode->numSuccedents == 0);
434 for (i = 0; i < nWndNodes; i++) {
435 RF_ASSERT(wndNodes->numSuccedents == 1);
436 wndNodes[i].succedents[0] = termNode;
437 termNode->antecedents[i] = &wndNodes[i];
438 termNode->antType[i] = rf_control;
439 }
440 RF_ASSERT(wnpNode->numSuccedents == 1);
441 wnpNode->succedents[0] = termNode;
442 termNode->antecedents[nWndNodes] = wnpNode;
443 termNode->antType[nWndNodes] = rf_control;
444 if (nfaults == 2) {
445 RF_ASSERT(wnqNode->numSuccedents == 1);
446 wnqNode->succedents[0] = termNode;
447 termNode->antecedents[nWndNodes + 1] = wnqNode;
448 termNode->antType[nWndNodes + 1] = rf_control;
449 }
450 }
451 /******************************************************************************
452 *
453 * creates a DAG to perform a small-write operation (either raid 5 or pq),
454 * which is as follows:
455 *
456 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
457 * \- Rod X / \----> Wnd [Und]-/
458 * [\- Rod X / \---> Wnd [Und]-/]
459 * [\- Roq -> Q / \--> Wnq [Unq]-/]
460 *
461 * Rop = read old parity
462 * Rod = read old data
463 * Roq = read old "q"
464 * Cmt = commit node
465 * Und = unlock data disk
466 * Unp = unlock parity disk
467 * Unq = unlock q disk
468 * Wnp = write new parity
469 * Wnd = write new data
470 * Wnq = write new "q"
471 * [ ] denotes optional segments in the graph
472 *
473 * Parameters: raidPtr - description of the physical array
474 * asmap - logical & physical addresses for this access
475 * bp - buffer ptr (holds write data)
476 * flags - general flags (e.g. disk locking)
477 * allocList - list of memory allocated in DAG creation
478 * pfuncs - list of parity generating functions
479 * qfuncs - list of q generating functions
480 *
481 * A null qfuncs indicates single fault tolerant
482 *****************************************************************************/
483
484 void
485 rf_CommonCreateSmallWriteDAG(
486 RF_Raid_t * raidPtr,
487 RF_AccessStripeMap_t * asmap,
488 RF_DagHeader_t * dag_h,
489 void *bp,
490 RF_RaidAccessFlags_t flags,
491 RF_AllocListElem_t * allocList,
492 RF_RedFuncs_t * pfuncs,
493 RF_RedFuncs_t * qfuncs)
494 {
495 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
496 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
497 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
498 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
499 int i, j, nNodes, totalNumNodes, lu_flag;
500 RF_ReconUnitNum_t which_ru;
501 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
502 int (*qfunc) (RF_DagNode_t *);
503 int numDataNodes, numParityNodes;
504 RF_StripeNum_t parityStripeID;
505 RF_PhysDiskAddr_t *pda;
506 char *name, *qname;
507 long nfaults;
508
509 nfaults = qfuncs ? 2 : 1;
510 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
511
512 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
513 asmap->raidAddress, &which_ru);
514 pda = asmap->physInfo;
515 numDataNodes = asmap->numStripeUnitsAccessed;
516 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
517
518 if (rf_dagDebug) {
519 printf("[Creating small-write DAG]\n");
520 }
521 RF_ASSERT(numDataNodes > 0);
522 dag_h->creator = "SmallWriteDAG";
523
524 dag_h->numCommitNodes = 1;
525 dag_h->numCommits = 0;
526 dag_h->numSuccedents = 1;
527
528 /*
529 * DAG creation occurs in four steps:
530 * 1. count the number of nodes in the DAG
531 * 2. create the nodes
532 * 3. initialize the nodes
533 * 4. connect the nodes
534 */
535
536 /*
537 * Step 1. compute number of nodes in the graph
538 */
539
540 /* number of nodes: a read and write for each data unit a redundancy
541 * computation node for each parity node (nfaults * nparity) a read
542 * and write for each parity unit a block and commit node (2) a
543 * terminate node if atomic RMW an unlock node for each data unit,
544 * redundancy unit */
545 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
546 + (nfaults * 2 * numParityNodes) + 3;
547 if (lu_flag) {
548 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
549 }
550 /*
551 * Step 2. create the nodes
552 */
553 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
554 (RF_DagNode_t *), allocList);
555 i = 0;
556 blockNode = &nodes[i];
557 i += 1;
558 commitNode = &nodes[i];
559 i += 1;
560 readDataNodes = &nodes[i];
561 i += numDataNodes;
562 readParityNodes = &nodes[i];
563 i += numParityNodes;
564 writeDataNodes = &nodes[i];
565 i += numDataNodes;
566 writeParityNodes = &nodes[i];
567 i += numParityNodes;
568 xorNodes = &nodes[i];
569 i += numParityNodes;
570 termNode = &nodes[i];
571 i += 1;
572 if (lu_flag) {
573 unlockDataNodes = &nodes[i];
574 i += numDataNodes;
575 unlockParityNodes = &nodes[i];
576 i += numParityNodes;
577 } else {
578 unlockDataNodes = unlockParityNodes = NULL;
579 }
580 if (nfaults == 2) {
581 readQNodes = &nodes[i];
582 i += numParityNodes;
583 writeQNodes = &nodes[i];
584 i += numParityNodes;
585 qNodes = &nodes[i];
586 i += numParityNodes;
587 if (lu_flag) {
588 unlockQNodes = &nodes[i];
589 i += numParityNodes;
590 } else {
591 unlockQNodes = NULL;
592 }
593 } else {
594 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
595 }
596 RF_ASSERT(i == totalNumNodes);
597
598 /*
599 * Step 3. initialize the nodes
600 */
601 /* initialize block node (Nil) */
602 nNodes = numDataNodes + (nfaults * numParityNodes);
603 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
604 NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
605
606 /* initialize commit node (Cmt) */
607 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
608 NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
609
610 /* initialize terminate node (Trm) */
611 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
612 NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
613
614 /* initialize nodes which read old data (Rod) */
615 for (i = 0; i < numDataNodes; i++) {
616 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
617 rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h,
618 "Rod", allocList);
619 RF_ASSERT(pda != NULL);
620 /* physical disk addr desc */
621 readDataNodes[i].params[0].p = pda;
622 /* buffer to hold old data */
623 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
624 dag_h, pda, allocList);
625 readDataNodes[i].params[2].v = parityStripeID;
626 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
627 lu_flag, 0, which_ru);
628 pda = pda->next;
629 for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
630 readDataNodes[i].propList[j] = NULL;
631 }
632 }
633
634 /* initialize nodes which read old parity (Rop) */
635 pda = asmap->parityInfo;
636 i = 0;
637 for (i = 0; i < numParityNodes; i++) {
638 RF_ASSERT(pda != NULL);
639 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc,
640 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4,
641 0, dag_h, "Rop", allocList);
642 readParityNodes[i].params[0].p = pda;
643 /* buffer to hold old parity */
644 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
645 dag_h, pda, allocList);
646 readParityNodes[i].params[2].v = parityStripeID;
647 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
648 lu_flag, 0, which_ru);
649 pda = pda->next;
650 for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
651 readParityNodes[i].propList[0] = NULL;
652 }
653 }
654
655 /* initialize nodes which read old Q (Roq) */
656 if (nfaults == 2) {
657 pda = asmap->qInfo;
658 for (i = 0; i < numParityNodes; i++) {
659 RF_ASSERT(pda != NULL);
660 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
661 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
662 readQNodes[i].params[0].p = pda;
663 /* buffer to hold old Q */
664 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda,
665 allocList);
666 readQNodes[i].params[2].v = parityStripeID;
667 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
668 lu_flag, 0, which_ru);
669 pda = pda->next;
670 for (j = 0; j < readQNodes[i].numSuccedents; j++) {
671 readQNodes[i].propList[0] = NULL;
672 }
673 }
674 }
675 /* initialize nodes which write new data (Wnd) */
676 pda = asmap->physInfo;
677 for (i = 0; i < numDataNodes; i++) {
678 RF_ASSERT(pda != NULL);
679 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
680 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
681 "Wnd", allocList);
682 /* physical disk addr desc */
683 writeDataNodes[i].params[0].p = pda;
684 /* buffer holding new data to be written */
685 writeDataNodes[i].params[1].p = pda->bufPtr;
686 writeDataNodes[i].params[2].v = parityStripeID;
687 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
688 0, 0, which_ru);
689 if (lu_flag) {
690 /* initialize node to unlock the disk queue */
691 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
692 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
693 "Und", allocList);
694 /* physical disk addr desc */
695 unlockDataNodes[i].params[0].p = pda;
696 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
697 0, lu_flag, which_ru);
698 }
699 pda = pda->next;
700 }
701
702 /*
703 * Initialize nodes which compute new parity and Q.
704 */
705 /*
706 * We use the simple XOR func in the double-XOR case, and when
707 * we're accessing only a portion of one stripe unit. The distinction
708 * between the two is that the regular XOR func assumes that the targbuf
709 * is a full SU in size, and examines the pda associated with the buffer
710 * to decide where within the buffer to XOR the data, whereas
711 * the simple XOR func just XORs the data into the start of the buffer.
712 */
713 if ((numParityNodes == 2) || ((numDataNodes == 1)
714 && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
715 func = pfuncs->simple;
716 undoFunc = rf_NullNodeUndoFunc;
717 name = pfuncs->SimpleName;
718 if (qfuncs) {
719 qfunc = qfuncs->simple;
720 qname = qfuncs->SimpleName;
721 } else {
722 qfunc = NULL;
723 qname = NULL;
724 }
725 } else {
726 func = pfuncs->regular;
727 undoFunc = rf_NullNodeUndoFunc;
728 name = pfuncs->RegularName;
729 if (qfuncs) {
730 qfunc = qfuncs->regular;
731 qname = qfuncs->RegularName;
732 } else {
733 qfunc = NULL;
734 qname = NULL;
735 }
736 }
737 /*
738 * Initialize the xor nodes: params are {pda,buf}
739 * from {Rod,Wnd,Rop} nodes, and raidPtr
740 */
741 if (numParityNodes == 2) {
742 /* double-xor case */
743 for (i = 0; i < numParityNodes; i++) {
744 /* note: no wakeup func for xor */
745 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL,
746 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList);
747 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
748 xorNodes[i].params[0] = readDataNodes[i].params[0];
749 xorNodes[i].params[1] = readDataNodes[i].params[1];
750 xorNodes[i].params[2] = readParityNodes[i].params[0];
751 xorNodes[i].params[3] = readParityNodes[i].params[1];
752 xorNodes[i].params[4] = writeDataNodes[i].params[0];
753 xorNodes[i].params[5] = writeDataNodes[i].params[1];
754 xorNodes[i].params[6].p = raidPtr;
755 /* use old parity buf as target buf */
756 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
757 if (nfaults == 2) {
758 /* note: no wakeup func for qor */
759 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
760 (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList);
761 qNodes[i].params[0] = readDataNodes[i].params[0];
762 qNodes[i].params[1] = readDataNodes[i].params[1];
763 qNodes[i].params[2] = readQNodes[i].params[0];
764 qNodes[i].params[3] = readQNodes[i].params[1];
765 qNodes[i].params[4] = writeDataNodes[i].params[0];
766 qNodes[i].params[5] = writeDataNodes[i].params[1];
767 qNodes[i].params[6].p = raidPtr;
768 /* use old Q buf as target buf */
769 qNodes[i].results[0] = readQNodes[i].params[1].p;
770 }
771 }
772 } else {
773 /* there is only one xor node in this case */
774 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1,
775 (numDataNodes + numParityNodes),
776 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
777 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
778 for (i = 0; i < numDataNodes + 1; i++) {
779 /* set up params related to Rod and Rop nodes */
780 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
781 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
782 }
783 for (i = 0; i < numDataNodes; i++) {
784 /* set up params related to Wnd and Wnp nodes */
785 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
786 writeDataNodes[i].params[0];
787 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
788 writeDataNodes[i].params[1];
789 }
790 /* xor node needs to get at RAID information */
791 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
792 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
793 if (nfaults == 2) {
794 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
795 (numDataNodes + numParityNodes),
796 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
797 qname, allocList);
798 for (i = 0; i < numDataNodes; i++) {
799 /* set up params related to Rod */
800 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
801 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
802 }
803 /* and read old q */
804 qNodes[0].params[2 * numDataNodes + 0] = /* pda */
805 readQNodes[0].params[0];
806 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */
807 readQNodes[0].params[1];
808 for (i = 0; i < numDataNodes; i++) {
809 /* set up params related to Wnd nodes */
810 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
811 writeDataNodes[i].params[0];
812 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
813 writeDataNodes[i].params[1];
814 }
815 /* xor node needs to get at RAID information */
816 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
817 qNodes[0].results[0] = readQNodes[0].params[1].p;
818 }
819 }
820
821 /* initialize nodes which write new parity (Wnp) */
822 pda = asmap->parityInfo;
823 for (i = 0; i < numParityNodes; i++) {
824 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
825 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
826 "Wnp", allocList);
827 RF_ASSERT(pda != NULL);
828 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
829 * filled in by xor node */
830 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
831 * parity write
832 * operation */
833 writeParityNodes[i].params[2].v = parityStripeID;
834 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
835 0, 0, which_ru);
836 if (lu_flag) {
837 /* initialize node to unlock the disk queue */
838 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
839 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
840 "Unp", allocList);
841 unlockParityNodes[i].params[0].p = pda; /* physical disk addr
842 * desc */
843 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
844 0, lu_flag, which_ru);
845 }
846 pda = pda->next;
847 }
848
849 /* initialize nodes which write new Q (Wnq) */
850 if (nfaults == 2) {
851 pda = asmap->qInfo;
852 for (i = 0; i < numParityNodes; i++) {
853 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
854 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
855 "Wnq", allocList);
856 RF_ASSERT(pda != NULL);
857 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
858 * filled in by xor node */
859 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
860 * parity write
861 * operation */
862 writeQNodes[i].params[2].v = parityStripeID;
863 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
864 0, 0, which_ru);
865 if (lu_flag) {
866 /* initialize node to unlock the disk queue */
867 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
868 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
869 "Unq", allocList);
870 unlockQNodes[i].params[0].p = pda; /* physical disk addr
871 * desc */
872 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
873 0, lu_flag, which_ru);
874 }
875 pda = pda->next;
876 }
877 }
878 /*
879 * Step 4. connect the nodes.
880 */
881
882 /* connect header to block node */
883 dag_h->succedents[0] = blockNode;
884
885 /* connect block node to read old data nodes */
886 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
887 for (i = 0; i < numDataNodes; i++) {
888 blockNode->succedents[i] = &readDataNodes[i];
889 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
890 readDataNodes[i].antecedents[0] = blockNode;
891 readDataNodes[i].antType[0] = rf_control;
892 }
893
894 /* connect block node to read old parity nodes */
895 for (i = 0; i < numParityNodes; i++) {
896 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
897 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
898 readParityNodes[i].antecedents[0] = blockNode;
899 readParityNodes[i].antType[0] = rf_control;
900 }
901
902 /* connect block node to read old Q nodes */
903 if (nfaults == 2) {
904 for (i = 0; i < numParityNodes; i++) {
905 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
906 RF_ASSERT(readQNodes[i].numAntecedents == 1);
907 readQNodes[i].antecedents[0] = blockNode;
908 readQNodes[i].antType[0] = rf_control;
909 }
910 }
911 /* connect read old data nodes to xor nodes */
912 for (i = 0; i < numDataNodes; i++) {
913 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
914 for (j = 0; j < numParityNodes; j++) {
915 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
916 readDataNodes[i].succedents[j] = &xorNodes[j];
917 xorNodes[j].antecedents[i] = &readDataNodes[i];
918 xorNodes[j].antType[i] = rf_trueData;
919 }
920 }
921
922 /* connect read old data nodes to q nodes */
923 if (nfaults == 2) {
924 for (i = 0; i < numDataNodes; i++) {
925 for (j = 0; j < numParityNodes; j++) {
926 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
927 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
928 qNodes[j].antecedents[i] = &readDataNodes[i];
929 qNodes[j].antType[i] = rf_trueData;
930 }
931 }
932 }
933 /* connect read old parity nodes to xor nodes */
934 for (i = 0; i < numParityNodes; i++) {
935 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
936 for (j = 0; j < numParityNodes; j++) {
937 readParityNodes[i].succedents[j] = &xorNodes[j];
938 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
939 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
940 }
941 }
942
943 /* connect read old q nodes to q nodes */
944 if (nfaults == 2) {
945 for (i = 0; i < numParityNodes; i++) {
946 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
947 for (j = 0; j < numParityNodes; j++) {
948 readQNodes[i].succedents[j] = &qNodes[j];
949 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
950 qNodes[j].antType[numDataNodes + i] = rf_trueData;
951 }
952 }
953 }
954 /* connect xor nodes to commit node */
955 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
956 for (i = 0; i < numParityNodes; i++) {
957 RF_ASSERT(xorNodes[i].numSuccedents == 1);
958 xorNodes[i].succedents[0] = commitNode;
959 commitNode->antecedents[i] = &xorNodes[i];
960 commitNode->antType[i] = rf_control;
961 }
962
963 /* connect q nodes to commit node */
964 if (nfaults == 2) {
965 for (i = 0; i < numParityNodes; i++) {
966 RF_ASSERT(qNodes[i].numSuccedents == 1);
967 qNodes[i].succedents[0] = commitNode;
968 commitNode->antecedents[i + numParityNodes] = &qNodes[i];
969 commitNode->antType[i + numParityNodes] = rf_control;
970 }
971 }
972 /* connect commit node to write nodes */
973 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
974 for (i = 0; i < numDataNodes; i++) {
975 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
976 commitNode->succedents[i] = &writeDataNodes[i];
977 writeDataNodes[i].antecedents[0] = commitNode;
978 writeDataNodes[i].antType[0] = rf_trueData;
979 }
980 for (i = 0; i < numParityNodes; i++) {
981 RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
982 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
983 writeParityNodes[i].antecedents[0] = commitNode;
984 writeParityNodes[i].antType[0] = rf_trueData;
985 }
986 if (nfaults == 2) {
987 for (i = 0; i < numParityNodes; i++) {
988 RF_ASSERT(writeQNodes[i].numAntecedents == 1);
989 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
990 writeQNodes[i].antecedents[0] = commitNode;
991 writeQNodes[i].antType[0] = rf_trueData;
992 }
993 }
994 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
995 RF_ASSERT(termNode->numSuccedents == 0);
996 for (i = 0; i < numDataNodes; i++) {
997 if (lu_flag) {
998 /* connect write new data nodes to unlock nodes */
999 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1000 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1001 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1002 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1003 unlockDataNodes[i].antType[0] = rf_control;
1004
1005 /* connect unlock nodes to term node */
1006 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1007 unlockDataNodes[i].succedents[0] = termNode;
1008 termNode->antecedents[i] = &unlockDataNodes[i];
1009 termNode->antType[i] = rf_control;
1010 } else {
1011 /* connect write new data nodes to term node */
1012 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1013 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1014 writeDataNodes[i].succedents[0] = termNode;
1015 termNode->antecedents[i] = &writeDataNodes[i];
1016 termNode->antType[i] = rf_control;
1017 }
1018 }
1019
1020 for (i = 0; i < numParityNodes; i++) {
1021 if (lu_flag) {
1022 /* connect write new parity nodes to unlock nodes */
1023 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1024 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1025 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1026 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1027 unlockParityNodes[i].antType[0] = rf_control;
1028
1029 /* connect unlock nodes to term node */
1030 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1031 unlockParityNodes[i].succedents[0] = termNode;
1032 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1033 termNode->antType[numDataNodes + i] = rf_control;
1034 } else {
1035 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1036 writeParityNodes[i].succedents[0] = termNode;
1037 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1038 termNode->antType[numDataNodes + i] = rf_control;
1039 }
1040 }
1041
1042 if (nfaults == 2) {
1043 for (i = 0; i < numParityNodes; i++) {
1044 if (lu_flag) {
1045 /* connect write new Q nodes to unlock nodes */
1046 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1047 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1048 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1049 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1050 unlockQNodes[i].antType[0] = rf_control;
1051
1052 /* connect unlock nodes to unblock node */
1053 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1054 unlockQNodes[i].succedents[0] = termNode;
1055 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1056 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1057 } else {
1058 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1059 writeQNodes[i].succedents[0] = termNode;
1060 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1061 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1062 }
1063 }
1064 }
1065 }
1066
1067
1068 /******************************************************************************
1069 * create a write graph (fault-free or degraded) for RAID level 1
1070 *
1071 * Hdr -> Commit -> Wpd -> Nil -> Trm
1072 * -> Wsd ->
1073 *
1074 * The "Wpd" node writes data to the primary copy in the mirror pair
1075 * The "Wsd" node writes data to the secondary copy in the mirror pair
1076 *
1077 * Parameters: raidPtr - description of the physical array
1078 * asmap - logical & physical addresses for this access
1079 * bp - buffer ptr (holds write data)
1080 * flags - general flags (e.g. disk locking)
1081 * allocList - list of memory allocated in DAG creation
1082 *****************************************************************************/
1083
1084 void
1085 rf_CreateRaidOneWriteDAG(
1086 RF_Raid_t * raidPtr,
1087 RF_AccessStripeMap_t * asmap,
1088 RF_DagHeader_t * dag_h,
1089 void *bp,
1090 RF_RaidAccessFlags_t flags,
1091 RF_AllocListElem_t * allocList)
1092 {
1093 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1094 RF_DagNode_t *nodes, *wndNode, *wmirNode;
1095 int nWndNodes, nWmirNodes, i;
1096 RF_ReconUnitNum_t which_ru;
1097 RF_PhysDiskAddr_t *pda, *pdaP;
1098 RF_StripeNum_t parityStripeID;
1099
1100 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1101 asmap->raidAddress, &which_ru);
1102 if (rf_dagDebug) {
1103 printf("[Creating RAID level 1 write DAG]\n");
1104 }
1105 dag_h->creator = "RaidOneWriteDAG";
1106
1107 /* 2 implies access not SU aligned */
1108 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1109 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1110
1111 /* alloc the Wnd nodes and the Wmir node */
1112 if (asmap->numDataFailed == 1)
1113 nWndNodes--;
1114 if (asmap->numParityFailed == 1)
1115 nWmirNodes--;
1116
1117 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock
1118 * + terminator) */
1119 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1120 (RF_DagNode_t *), allocList);
1121 i = 0;
1122 wndNode = &nodes[i];
1123 i += nWndNodes;
1124 wmirNode = &nodes[i];
1125 i += nWmirNodes;
1126 commitNode = &nodes[i];
1127 i += 1;
1128 unblockNode = &nodes[i];
1129 i += 1;
1130 termNode = &nodes[i];
1131 i += 1;
1132 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1133
1134 /* this dag can commit immediately */
1135 dag_h->numCommitNodes = 1;
1136 dag_h->numCommits = 0;
1137 dag_h->numSuccedents = 1;
1138
1139 /* initialize the commit, unblock, and term nodes */
1140 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1141 NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList);
1142 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1143 NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
1144 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
1145 NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1146
1147 /* initialize the wnd nodes */
1148 if (nWndNodes > 0) {
1149 pda = asmap->physInfo;
1150 for (i = 0; i < nWndNodes; i++) {
1151 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1152 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
1153 RF_ASSERT(pda != NULL);
1154 wndNode[i].params[0].p = pda;
1155 wndNode[i].params[1].p = pda->bufPtr;
1156 wndNode[i].params[2].v = parityStripeID;
1157 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1158 pda = pda->next;
1159 }
1160 RF_ASSERT(pda == NULL);
1161 }
1162 /* initialize the mirror nodes */
1163 if (nWmirNodes > 0) {
1164 pda = asmap->physInfo;
1165 pdaP = asmap->parityInfo;
1166 for (i = 0; i < nWmirNodes; i++) {
1167 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1168 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
1169 RF_ASSERT(pda != NULL);
1170 wmirNode[i].params[0].p = pdaP;
1171 wmirNode[i].params[1].p = pda->bufPtr;
1172 wmirNode[i].params[2].v = parityStripeID;
1173 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1174 pda = pda->next;
1175 pdaP = pdaP->next;
1176 }
1177 RF_ASSERT(pda == NULL);
1178 RF_ASSERT(pdaP == NULL);
1179 }
1180 /* link the header node to the commit node */
1181 RF_ASSERT(dag_h->numSuccedents == 1);
1182 RF_ASSERT(commitNode->numAntecedents == 0);
1183 dag_h->succedents[0] = commitNode;
1184
1185 /* link the commit node to the write nodes */
1186 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1187 for (i = 0; i < nWndNodes; i++) {
1188 RF_ASSERT(wndNode[i].numAntecedents == 1);
1189 commitNode->succedents[i] = &wndNode[i];
1190 wndNode[i].antecedents[0] = commitNode;
1191 wndNode[i].antType[0] = rf_control;
1192 }
1193 for (i = 0; i < nWmirNodes; i++) {
1194 RF_ASSERT(wmirNode[i].numAntecedents == 1);
1195 commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1196 wmirNode[i].antecedents[0] = commitNode;
1197 wmirNode[i].antType[0] = rf_control;
1198 }
1199
1200 /* link the write nodes to the unblock node */
1201 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1202 for (i = 0; i < nWndNodes; i++) {
1203 RF_ASSERT(wndNode[i].numSuccedents == 1);
1204 wndNode[i].succedents[0] = unblockNode;
1205 unblockNode->antecedents[i] = &wndNode[i];
1206 unblockNode->antType[i] = rf_control;
1207 }
1208 for (i = 0; i < nWmirNodes; i++) {
1209 RF_ASSERT(wmirNode[i].numSuccedents == 1);
1210 wmirNode[i].succedents[0] = unblockNode;
1211 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1212 unblockNode->antType[i + nWndNodes] = rf_control;
1213 }
1214
1215 /* link the unblock node to the term node */
1216 RF_ASSERT(unblockNode->numSuccedents == 1);
1217 RF_ASSERT(termNode->numAntecedents == 1);
1218 RF_ASSERT(termNode->numSuccedents == 0);
1219 unblockNode->succedents[0] = termNode;
1220 termNode->antecedents[0] = unblockNode;
1221 termNode->antType[0] = rf_control;
1222 }
1223
1224
1225
1226 /* DAGs which have no commit points.
1227 *
1228 * The following DAGs are used in forward and backward error recovery experiments.
1229 * They are identical to the DAGs above this comment with the exception that the
1230 * the commit points have been removed.
1231 */
1232
1233
1234
1235 void
1236 rf_CommonCreateLargeWriteDAGFwd(
1237 RF_Raid_t * raidPtr,
1238 RF_AccessStripeMap_t * asmap,
1239 RF_DagHeader_t * dag_h,
1240 void *bp,
1241 RF_RaidAccessFlags_t flags,
1242 RF_AllocListElem_t * allocList,
1243 int nfaults,
1244 int (*redFunc) (RF_DagNode_t *),
1245 int allowBufferRecycle)
1246 {
1247 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1248 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1249 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1250 RF_AccessStripeMapHeader_t *new_asm_h[2];
1251 RF_StripeNum_t parityStripeID;
1252 char *sosBuffer, *eosBuffer;
1253 RF_ReconUnitNum_t which_ru;
1254 RF_RaidLayout_t *layoutPtr;
1255 RF_PhysDiskAddr_t *pda;
1256
1257 layoutPtr = &(raidPtr->Layout);
1258 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1259
1260 if (rf_dagDebug)
1261 printf("[Creating large-write DAG]\n");
1262 dag_h->creator = "LargeWriteDAGFwd";
1263
1264 dag_h->numCommitNodes = 0;
1265 dag_h->numCommits = 0;
1266 dag_h->numSuccedents = 1;
1267
1268 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
1269 nWndNodes = asmap->numStripeUnitsAccessed;
1270 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1271 i = 0;
1272 wndNodes = &nodes[i];
1273 i += nWndNodes;
1274 xorNode = &nodes[i];
1275 i += 1;
1276 wnpNode = &nodes[i];
1277 i += 1;
1278 blockNode = &nodes[i];
1279 i += 1;
1280 syncNode = &nodes[i];
1281 i += 1;
1282 termNode = &nodes[i];
1283 i += 1;
1284 if (nfaults == 2) {
1285 wnqNode = &nodes[i];
1286 i += 1;
1287 } else {
1288 wnqNode = NULL;
1289 }
1290 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1291 if (nRodNodes > 0) {
1292 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1293 } else {
1294 rodNodes = NULL;
1295 }
1296
1297 /* begin node initialization */
1298 if (nRodNodes > 0) {
1299 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
1300 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList);
1301 } else {
1302 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
1303 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList);
1304 }
1305
1306 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
1307
1308 /* initialize the Rod nodes */
1309 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1310 if (new_asm_h[asmNum]) {
1311 pda = new_asm_h[asmNum]->stripeMap->physInfo;
1312 while (pda) {
1313 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList);
1314 rodNodes[nodeNum].params[0].p = pda;
1315 rodNodes[nodeNum].params[1].p = pda->bufPtr;
1316 rodNodes[nodeNum].params[2].v = parityStripeID;
1317 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1318 nodeNum++;
1319 pda = pda->next;
1320 }
1321 }
1322 }
1323 RF_ASSERT(nodeNum == nRodNodes);
1324
1325 /* initialize the wnd nodes */
1326 pda = asmap->physInfo;
1327 for (i = 0; i < nWndNodes; i++) {
1328 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1329 RF_ASSERT(pda != NULL);
1330 wndNodes[i].params[0].p = pda;
1331 wndNodes[i].params[1].p = pda->bufPtr;
1332 wndNodes[i].params[2].v = parityStripeID;
1333 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1334 pda = pda->next;
1335 }
1336
1337 /* initialize the redundancy node */
1338 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
1339 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1340 for (i = 0; i < nWndNodes; i++) {
1341 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */
1342 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */
1343 }
1344 for (i = 0; i < nRodNodes; i++) {
1345 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */
1346 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */
1347 }
1348 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; /* xor node needs to get
1349 * at RAID information */
1350
1351 /* look for an Rod node that reads a complete SU. If none, alloc a
1352 * buffer to receive the parity info. Note that we can't use a new
1353 * data buffer because it will not have gotten written when the xor
1354 * occurs. */
1355 if (allowBufferRecycle) {
1356 for (i = 0; i < nRodNodes; i++)
1357 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1358 break;
1359 }
1360 if ((!allowBufferRecycle) || (i == nRodNodes)) {
1361 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1362 } else
1363 xorNode->results[0] = rodNodes[i].params[1].p;
1364
1365 /* initialize the Wnp node */
1366 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
1367 wnpNode->params[0].p = asmap->parityInfo;
1368 wnpNode->params[1].p = xorNode->results[0];
1369 wnpNode->params[2].v = parityStripeID;
1370 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1371 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must
1372 * describe entire
1373 * parity unit */
1374
1375 if (nfaults == 2) {
1376 /* we never try to recycle a buffer for the Q calcuation in
1377 * addition to the parity. This would cause two buffers to get
1378 * smashed during the P and Q calculation, guaranteeing one
1379 * would be wrong. */
1380 RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1381 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
1382 wnqNode->params[0].p = asmap->qInfo;
1383 wnqNode->params[1].p = xorNode->results[1];
1384 wnqNode->params[2].v = parityStripeID;
1385 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1386 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must
1387 * describe entire
1388 * parity unit */
1389 }
1390 /* connect nodes to form graph */
1391
1392 /* connect dag header to block node */
1393 RF_ASSERT(blockNode->numAntecedents == 0);
1394 dag_h->succedents[0] = blockNode;
1395
1396 if (nRodNodes > 0) {
1397 /* connect the block node to the Rod nodes */
1398 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1399 RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1400 for (i = 0; i < nRodNodes; i++) {
1401 RF_ASSERT(rodNodes[i].numAntecedents == 1);
1402 blockNode->succedents[i] = &rodNodes[i];
1403 rodNodes[i].antecedents[0] = blockNode;
1404 rodNodes[i].antType[0] = rf_control;
1405
1406 /* connect the Rod nodes to the Nil node */
1407 RF_ASSERT(rodNodes[i].numSuccedents == 1);
1408 rodNodes[i].succedents[0] = syncNode;
1409 syncNode->antecedents[i] = &rodNodes[i];
1410 syncNode->antType[i] = rf_trueData;
1411 }
1412 } else {
1413 /* connect the block node to the Nil node */
1414 RF_ASSERT(blockNode->numSuccedents == 1);
1415 RF_ASSERT(syncNode->numAntecedents == 1);
1416 blockNode->succedents[0] = syncNode;
1417 syncNode->antecedents[0] = blockNode;
1418 syncNode->antType[0] = rf_control;
1419 }
1420
1421 /* connect the sync node to the Wnd nodes */
1422 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1423 for (i = 0; i < nWndNodes; i++) {
1424 RF_ASSERT(wndNodes->numAntecedents == 1);
1425 syncNode->succedents[i] = &wndNodes[i];
1426 wndNodes[i].antecedents[0] = syncNode;
1427 wndNodes[i].antType[0] = rf_control;
1428 }
1429
1430 /* connect the sync node to the Xor node */
1431 RF_ASSERT(xorNode->numAntecedents == 1);
1432 syncNode->succedents[nWndNodes] = xorNode;
1433 xorNode->antecedents[0] = syncNode;
1434 xorNode->antType[0] = rf_control;
1435
1436 /* connect the xor node to the write parity node */
1437 RF_ASSERT(xorNode->numSuccedents == nfaults);
1438 RF_ASSERT(wnpNode->numAntecedents == 1);
1439 xorNode->succedents[0] = wnpNode;
1440 wnpNode->antecedents[0] = xorNode;
1441 wnpNode->antType[0] = rf_trueData;
1442 if (nfaults == 2) {
1443 RF_ASSERT(wnqNode->numAntecedents == 1);
1444 xorNode->succedents[1] = wnqNode;
1445 wnqNode->antecedents[0] = xorNode;
1446 wnqNode->antType[0] = rf_trueData;
1447 }
1448 /* connect the write nodes to the term node */
1449 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1450 RF_ASSERT(termNode->numSuccedents == 0);
1451 for (i = 0; i < nWndNodes; i++) {
1452 RF_ASSERT(wndNodes->numSuccedents == 1);
1453 wndNodes[i].succedents[0] = termNode;
1454 termNode->antecedents[i] = &wndNodes[i];
1455 termNode->antType[i] = rf_control;
1456 }
1457 RF_ASSERT(wnpNode->numSuccedents == 1);
1458 wnpNode->succedents[0] = termNode;
1459 termNode->antecedents[nWndNodes] = wnpNode;
1460 termNode->antType[nWndNodes] = rf_control;
1461 if (nfaults == 2) {
1462 RF_ASSERT(wnqNode->numSuccedents == 1);
1463 wnqNode->succedents[0] = termNode;
1464 termNode->antecedents[nWndNodes + 1] = wnqNode;
1465 termNode->antType[nWndNodes + 1] = rf_control;
1466 }
1467 }
1468
1469
1470 /******************************************************************************
1471 *
1472 * creates a DAG to perform a small-write operation (either raid 5 or pq),
1473 * which is as follows:
1474 *
1475 * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
1476 * \- Rod X- Wnd [Und] -------/
1477 * [\- Rod X- Wnd [Und] ------/]
1478 * [\- Roq - Q --> Wnq [Unq]-/]
1479 *
1480 * Rop = read old parity
1481 * Rod = read old data
1482 * Roq = read old "q"
1483 * Cmt = commit node
1484 * Und = unlock data disk
1485 * Unp = unlock parity disk
1486 * Unq = unlock q disk
1487 * Wnp = write new parity
1488 * Wnd = write new data
1489 * Wnq = write new "q"
1490 * [ ] denotes optional segments in the graph
1491 *
1492 * Parameters: raidPtr - description of the physical array
1493 * asmap - logical & physical addresses for this access
1494 * bp - buffer ptr (holds write data)
1495 * flags - general flags (e.g. disk locking)
1496 * allocList - list of memory allocated in DAG creation
1497 * pfuncs - list of parity generating functions
1498 * qfuncs - list of q generating functions
1499 *
1500 * A null qfuncs indicates single fault tolerant
1501 *****************************************************************************/
1502
1503 void
1504 rf_CommonCreateSmallWriteDAGFwd(
1505 RF_Raid_t * raidPtr,
1506 RF_AccessStripeMap_t * asmap,
1507 RF_DagHeader_t * dag_h,
1508 void *bp,
1509 RF_RaidAccessFlags_t flags,
1510 RF_AllocListElem_t * allocList,
1511 RF_RedFuncs_t * pfuncs,
1512 RF_RedFuncs_t * qfuncs)
1513 {
1514 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1515 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1516 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1517 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1518 int i, j, nNodes, totalNumNodes, lu_flag;
1519 RF_ReconUnitNum_t which_ru;
1520 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
1521 int (*qfunc) (RF_DagNode_t *);
1522 int numDataNodes, numParityNodes;
1523 RF_StripeNum_t parityStripeID;
1524 RF_PhysDiskAddr_t *pda;
1525 char *name, *qname;
1526 long nfaults;
1527
1528 nfaults = qfuncs ? 2 : 1;
1529 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
1530
1531 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1532 pda = asmap->physInfo;
1533 numDataNodes = asmap->numStripeUnitsAccessed;
1534 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1535
1536 if (rf_dagDebug)
1537 printf("[Creating small-write DAG]\n");
1538 RF_ASSERT(numDataNodes > 0);
1539 dag_h->creator = "SmallWriteDAGFwd";
1540
1541 dag_h->numCommitNodes = 0;
1542 dag_h->numCommits = 0;
1543 dag_h->numSuccedents = 1;
1544
1545 qfunc = NULL;
1546 qname = NULL;
1547
1548 /* DAG creation occurs in four steps: 1. count the number of nodes in
1549 * the DAG 2. create the nodes 3. initialize the nodes 4. connect the
1550 * nodes */
1551
1552 /* Step 1. compute number of nodes in the graph */
1553
1554 /* number of nodes: a read and write for each data unit a redundancy
1555 * computation node for each parity node (nfaults * nparity) a read
1556 * and write for each parity unit a block node a terminate node if
1557 * atomic RMW an unlock node for each data unit, redundancy unit */
1558 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2;
1559 if (lu_flag)
1560 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1561
1562
1563 /* Step 2. create the nodes */
1564 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1565 i = 0;
1566 blockNode = &nodes[i];
1567 i += 1;
1568 readDataNodes = &nodes[i];
1569 i += numDataNodes;
1570 readParityNodes = &nodes[i];
1571 i += numParityNodes;
1572 writeDataNodes = &nodes[i];
1573 i += numDataNodes;
1574 writeParityNodes = &nodes[i];
1575 i += numParityNodes;
1576 xorNodes = &nodes[i];
1577 i += numParityNodes;
1578 termNode = &nodes[i];
1579 i += 1;
1580 if (lu_flag) {
1581 unlockDataNodes = &nodes[i];
1582 i += numDataNodes;
1583 unlockParityNodes = &nodes[i];
1584 i += numParityNodes;
1585 } else {
1586 unlockDataNodes = unlockParityNodes = NULL;
1587 }
1588 if (nfaults == 2) {
1589 readQNodes = &nodes[i];
1590 i += numParityNodes;
1591 writeQNodes = &nodes[i];
1592 i += numParityNodes;
1593 qNodes = &nodes[i];
1594 i += numParityNodes;
1595 if (lu_flag) {
1596 unlockQNodes = &nodes[i];
1597 i += numParityNodes;
1598 } else {
1599 unlockQNodes = NULL;
1600 }
1601 } else {
1602 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1603 }
1604 RF_ASSERT(i == totalNumNodes);
1605
1606 /* Step 3. initialize the nodes */
1607 /* initialize block node (Nil) */
1608 nNodes = numDataNodes + (nfaults * numParityNodes);
1609 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
1610
1611 /* initialize terminate node (Trm) */
1612 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
1613
1614 /* initialize nodes which read old data (Rod) */
1615 for (i = 0; i < numDataNodes; i++) {
1616 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList);
1617 RF_ASSERT(pda != NULL);
1618 readDataNodes[i].params[0].p = pda; /* physical disk addr
1619 * desc */
1620 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old
1621 * data */
1622 readDataNodes[i].params[2].v = parityStripeID;
1623 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1624 pda = pda->next;
1625 for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1626 readDataNodes[i].propList[j] = NULL;
1627 }
1628
1629 /* initialize nodes which read old parity (Rop) */
1630 pda = asmap->parityInfo;
1631 i = 0;
1632 for (i = 0; i < numParityNodes; i++) {
1633 RF_ASSERT(pda != NULL);
1634 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1635 readParityNodes[i].params[0].p = pda;
1636 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old
1637 * parity */
1638 readParityNodes[i].params[2].v = parityStripeID;
1639 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1640 for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1641 readParityNodes[i].propList[0] = NULL;
1642 pda = pda->next;
1643 }
1644
1645 /* initialize nodes which read old Q (Roq) */
1646 if (nfaults == 2) {
1647 pda = asmap->qInfo;
1648 for (i = 0; i < numParityNodes; i++) {
1649 RF_ASSERT(pda != NULL);
1650 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
1651 readQNodes[i].params[0].p = pda;
1652 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */
1653 readQNodes[i].params[2].v = parityStripeID;
1654 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1655 for (j = 0; j < readQNodes[i].numSuccedents; j++)
1656 readQNodes[i].propList[0] = NULL;
1657 pda = pda->next;
1658 }
1659 }
1660 /* initialize nodes which write new data (Wnd) */
1661 pda = asmap->physInfo;
1662 for (i = 0; i < numDataNodes; i++) {
1663 RF_ASSERT(pda != NULL);
1664 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1665 writeDataNodes[i].params[0].p = pda; /* physical disk addr
1666 * desc */
1667 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new
1668 * data to be written */
1669 writeDataNodes[i].params[2].v = parityStripeID;
1670 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1671
1672 if (lu_flag) {
1673 /* initialize node to unlock the disk queue */
1674 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
1675 unlockDataNodes[i].params[0].p = pda; /* physical disk addr
1676 * desc */
1677 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1678 }
1679 pda = pda->next;
1680 }
1681
1682
1683 /* initialize nodes which compute new parity and Q */
1684 /* we use the simple XOR func in the double-XOR case, and when we're
1685 * accessing only a portion of one stripe unit. the distinction
1686 * between the two is that the regular XOR func assumes that the
1687 * targbuf is a full SU in size, and examines the pda associated with
1688 * the buffer to decide where within the buffer to XOR the data,
1689 * whereas the simple XOR func just XORs the data into the start of
1690 * the buffer. */
1691 if ((numParityNodes == 2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
1692 func = pfuncs->simple;
1693 undoFunc = rf_NullNodeUndoFunc;
1694 name = pfuncs->SimpleName;
1695 if (qfuncs) {
1696 qfunc = qfuncs->simple;
1697 qname = qfuncs->SimpleName;
1698 }
1699 } else {
1700 func = pfuncs->regular;
1701 undoFunc = rf_NullNodeUndoFunc;
1702 name = pfuncs->RegularName;
1703 if (qfuncs) {
1704 qfunc = qfuncs->regular;
1705 qname = qfuncs->RegularName;
1706 }
1707 }
1708 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop}
1709 * nodes, and raidPtr */
1710 if (numParityNodes == 2) { /* double-xor case */
1711 for (i = 0; i < numParityNodes; i++) {
1712 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for
1713 * xor */
1714 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1715 xorNodes[i].params[0] = readDataNodes[i].params[0];
1716 xorNodes[i].params[1] = readDataNodes[i].params[1];
1717 xorNodes[i].params[2] = readParityNodes[i].params[0];
1718 xorNodes[i].params[3] = readParityNodes[i].params[1];
1719 xorNodes[i].params[4] = writeDataNodes[i].params[0];
1720 xorNodes[i].params[5] = writeDataNodes[i].params[1];
1721 xorNodes[i].params[6].p = raidPtr;
1722 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as
1723 * target buf */
1724 if (nfaults == 2) {
1725 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList); /* no wakeup func for
1726 * xor */
1727 qNodes[i].params[0] = readDataNodes[i].params[0];
1728 qNodes[i].params[1] = readDataNodes[i].params[1];
1729 qNodes[i].params[2] = readQNodes[i].params[0];
1730 qNodes[i].params[3] = readQNodes[i].params[1];
1731 qNodes[i].params[4] = writeDataNodes[i].params[0];
1732 qNodes[i].params[5] = writeDataNodes[i].params[1];
1733 qNodes[i].params[6].p = raidPtr;
1734 qNodes[i].results[0] = readQNodes[i].params[1].p; /* use old Q buf as
1735 * target buf */
1736 }
1737 }
1738 } else {
1739 /* there is only one xor node in this case */
1740 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
1741 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1742 for (i = 0; i < numDataNodes + 1; i++) {
1743 /* set up params related to Rod and Rop nodes */
1744 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
1745 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */
1746 }
1747 for (i = 0; i < numDataNodes; i++) {
1748 /* set up params related to Wnd and Wnp nodes */
1749 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */
1750 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */
1751 }
1752 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get
1753 * at RAID information */
1754 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1755 if (nfaults == 2) {
1756 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList);
1757 for (i = 0; i < numDataNodes; i++) {
1758 /* set up params related to Rod */
1759 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
1760 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */
1761 }
1762 /* and read old q */
1763 qNodes[0].params[2 * numDataNodes + 0] = readQNodes[0].params[0]; /* pda */
1764 qNodes[0].params[2 * numDataNodes + 1] = readQNodes[0].params[1]; /* buffer pointer */
1765 for (i = 0; i < numDataNodes; i++) {
1766 /* set up params related to Wnd nodes */
1767 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */
1768 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */
1769 }
1770 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get
1771 * at RAID information */
1772 qNodes[0].results[0] = readQNodes[0].params[1].p;
1773 }
1774 }
1775
1776 /* initialize nodes which write new parity (Wnp) */
1777 pda = asmap->parityInfo;
1778 for (i = 0; i < numParityNodes; i++) {
1779 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList);
1780 RF_ASSERT(pda != NULL);
1781 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
1782 * filled in by xor node */
1783 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
1784 * parity write
1785 * operation */
1786 writeParityNodes[i].params[2].v = parityStripeID;
1787 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1788
1789 if (lu_flag) {
1790 /* initialize node to unlock the disk queue */
1791 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList);
1792 unlockParityNodes[i].params[0].p = pda; /* physical disk addr
1793 * desc */
1794 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1795 }
1796 pda = pda->next;
1797 }
1798
1799 /* initialize nodes which write new Q (Wnq) */
1800 if (nfaults == 2) {
1801 pda = asmap->qInfo;
1802 for (i = 0; i < numParityNodes; i++) {
1803 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList);
1804 RF_ASSERT(pda != NULL);
1805 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
1806 * filled in by xor node */
1807 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
1808 * parity write
1809 * operation */
1810 writeQNodes[i].params[2].v = parityStripeID;
1811 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1812
1813 if (lu_flag) {
1814 /* initialize node to unlock the disk queue */
1815 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList);
1816 unlockQNodes[i].params[0].p = pda; /* physical disk addr
1817 * desc */
1818 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1819 }
1820 pda = pda->next;
1821 }
1822 }
1823 /* Step 4. connect the nodes */
1824
1825 /* connect header to block node */
1826 dag_h->succedents[0] = blockNode;
1827
1828 /* connect block node to read old data nodes */
1829 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
1830 for (i = 0; i < numDataNodes; i++) {
1831 blockNode->succedents[i] = &readDataNodes[i];
1832 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
1833 readDataNodes[i].antecedents[0] = blockNode;
1834 readDataNodes[i].antType[0] = rf_control;
1835 }
1836
1837 /* connect block node to read old parity nodes */
1838 for (i = 0; i < numParityNodes; i++) {
1839 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
1840 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
1841 readParityNodes[i].antecedents[0] = blockNode;
1842 readParityNodes[i].antType[0] = rf_control;
1843 }
1844
1845 /* connect block node to read old Q nodes */
1846 if (nfaults == 2)
1847 for (i = 0; i < numParityNodes; i++) {
1848 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
1849 RF_ASSERT(readQNodes[i].numAntecedents == 1);
1850 readQNodes[i].antecedents[0] = blockNode;
1851 readQNodes[i].antType[0] = rf_control;
1852 }
1853
1854 /* connect read old data nodes to write new data nodes */
1855 for (i = 0; i < numDataNodes; i++) {
1856 RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1));
1857 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1858 readDataNodes[i].succedents[0] = &writeDataNodes[i];
1859 writeDataNodes[i].antecedents[0] = &readDataNodes[i];
1860 writeDataNodes[i].antType[0] = rf_antiData;
1861 }
1862
1863 /* connect read old data nodes to xor nodes */
1864 for (i = 0; i < numDataNodes; i++) {
1865 for (j = 0; j < numParityNodes; j++) {
1866 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
1867 readDataNodes[i].succedents[1 + j] = &xorNodes[j];
1868 xorNodes[j].antecedents[i] = &readDataNodes[i];
1869 xorNodes[j].antType[i] = rf_trueData;
1870 }
1871 }
1872
1873 /* connect read old data nodes to q nodes */
1874 if (nfaults == 2)
1875 for (i = 0; i < numDataNodes; i++)
1876 for (j = 0; j < numParityNodes; j++) {
1877 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
1878 readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j];
1879 qNodes[j].antecedents[i] = &readDataNodes[i];
1880 qNodes[j].antType[i] = rf_trueData;
1881 }
1882
1883 /* connect read old parity nodes to xor nodes */
1884 for (i = 0; i < numParityNodes; i++) {
1885 for (j = 0; j < numParityNodes; j++) {
1886 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1887 readParityNodes[i].succedents[j] = &xorNodes[j];
1888 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
1889 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
1890 }
1891 }
1892
1893 /* connect read old q nodes to q nodes */
1894 if (nfaults == 2)
1895 for (i = 0; i < numParityNodes; i++) {
1896 for (j = 0; j < numParityNodes; j++) {
1897 RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes);
1898 readQNodes[i].succedents[j] = &qNodes[j];
1899 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
1900 qNodes[j].antType[numDataNodes + i] = rf_trueData;
1901 }
1902 }
1903
1904 /* connect xor nodes to the write new parity nodes */
1905 for (i = 0; i < numParityNodes; i++) {
1906 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
1907 for (j = 0; j < numParityNodes; j++) {
1908 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
1909 xorNodes[i].succedents[j] = &writeParityNodes[j];
1910 writeParityNodes[j].antecedents[i] = &xorNodes[i];
1911 writeParityNodes[j].antType[i] = rf_trueData;
1912 }
1913 }
1914
1915 /* connect q nodes to the write new q nodes */
1916 if (nfaults == 2)
1917 for (i = 0; i < numParityNodes; i++) {
1918 RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes);
1919 for (j = 0; j < numParityNodes; j++) {
1920 RF_ASSERT(qNodes[j].numSuccedents == 1);
1921 qNodes[i].succedents[j] = &writeQNodes[j];
1922 writeQNodes[j].antecedents[i] = &qNodes[i];
1923 writeQNodes[j].antType[i] = rf_trueData;
1924 }
1925 }
1926
1927 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1928 RF_ASSERT(termNode->numSuccedents == 0);
1929 for (i = 0; i < numDataNodes; i++) {
1930 if (lu_flag) {
1931 /* connect write new data nodes to unlock nodes */
1932 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1933 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1934 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1935 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1936 unlockDataNodes[i].antType[0] = rf_control;
1937
1938 /* connect unlock nodes to term node */
1939 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1940 unlockDataNodes[i].succedents[0] = termNode;
1941 termNode->antecedents[i] = &unlockDataNodes[i];
1942 termNode->antType[i] = rf_control;
1943 } else {
1944 /* connect write new data nodes to term node */
1945 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1946 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1947 writeDataNodes[i].succedents[0] = termNode;
1948 termNode->antecedents[i] = &writeDataNodes[i];
1949 termNode->antType[i] = rf_control;
1950 }
1951 }
1952
1953 for (i = 0; i < numParityNodes; i++) {
1954 if (lu_flag) {
1955 /* connect write new parity nodes to unlock nodes */
1956 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1957 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1958 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1959 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1960 unlockParityNodes[i].antType[0] = rf_control;
1961
1962 /* connect unlock nodes to term node */
1963 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1964 unlockParityNodes[i].succedents[0] = termNode;
1965 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1966 termNode->antType[numDataNodes + i] = rf_control;
1967 } else {
1968 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1969 writeParityNodes[i].succedents[0] = termNode;
1970 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1971 termNode->antType[numDataNodes + i] = rf_control;
1972 }
1973 }
1974
1975 if (nfaults == 2)
1976 for (i = 0; i < numParityNodes; i++) {
1977 if (lu_flag) {
1978 /* connect write new Q nodes to unlock nodes */
1979 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1980 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1981 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1982 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1983 unlockQNodes[i].antType[0] = rf_control;
1984
1985 /* connect unlock nodes to unblock node */
1986 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1987 unlockQNodes[i].succedents[0] = termNode;
1988 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1989 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1990 } else {
1991 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1992 writeQNodes[i].succedents[0] = termNode;
1993 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1994 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1995 }
1996 }
1997 }
1998
1999
2000
2001 /******************************************************************************
2002 * create a write graph (fault-free or degraded) for RAID level 1
2003 *
2004 * Hdr Nil -> Wpd -> Nil -> Trm
2005 * Nil -> Wsd ->
2006 *
2007 * The "Wpd" node writes data to the primary copy in the mirror pair
2008 * The "Wsd" node writes data to the secondary copy in the mirror pair
2009 *
2010 * Parameters: raidPtr - description of the physical array
2011 * asmap - logical & physical addresses for this access
2012 * bp - buffer ptr (holds write data)
2013 * flags - general flags (e.g. disk locking)
2014 * allocList - list of memory allocated in DAG creation
2015 *****************************************************************************/
2016
2017 void
2018 rf_CreateRaidOneWriteDAGFwd(
2019 RF_Raid_t * raidPtr,
2020 RF_AccessStripeMap_t * asmap,
2021 RF_DagHeader_t * dag_h,
2022 void *bp,
2023 RF_RaidAccessFlags_t flags,
2024 RF_AllocListElem_t * allocList)
2025 {
2026 RF_DagNode_t *blockNode, *unblockNode, *termNode;
2027 RF_DagNode_t *nodes, *wndNode, *wmirNode;
2028 int nWndNodes, nWmirNodes, i;
2029 RF_ReconUnitNum_t which_ru;
2030 RF_PhysDiskAddr_t *pda, *pdaP;
2031 RF_StripeNum_t parityStripeID;
2032
2033 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2034 asmap->raidAddress, &which_ru);
2035 if (rf_dagDebug) {
2036 printf("[Creating RAID level 1 write DAG]\n");
2037 }
2038 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; /* 2 implies access not
2039 * SU aligned */
2040 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2041
2042 /* alloc the Wnd nodes and the Wmir node */
2043 if (asmap->numDataFailed == 1)
2044 nWndNodes--;
2045 if (asmap->numParityFailed == 1)
2046 nWmirNodes--;
2047
2048 /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock +
2049 * terminator) */
2050 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2051 i = 0;
2052 wndNode = &nodes[i];
2053 i += nWndNodes;
2054 wmirNode = &nodes[i];
2055 i += nWmirNodes;
2056 blockNode = &nodes[i];
2057 i += 1;
2058 unblockNode = &nodes[i];
2059 i += 1;
2060 termNode = &nodes[i];
2061 i += 1;
2062 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2063
2064 /* this dag can commit immediately */
2065 dag_h->numCommitNodes = 0;
2066 dag_h->numCommits = 0;
2067 dag_h->numSuccedents = 1;
2068
2069 /* initialize the unblock and term nodes */
2070 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList);
2071 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
2072 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2073
2074 /* initialize the wnd nodes */
2075 if (nWndNodes > 0) {
2076 pda = asmap->physInfo;
2077 for (i = 0; i < nWndNodes; i++) {
2078 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
2079 RF_ASSERT(pda != NULL);
2080 wndNode[i].params[0].p = pda;
2081 wndNode[i].params[1].p = pda->bufPtr;
2082 wndNode[i].params[2].v = parityStripeID;
2083 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2084 pda = pda->next;
2085 }
2086 RF_ASSERT(pda == NULL);
2087 }
2088 /* initialize the mirror nodes */
2089 if (nWmirNodes > 0) {
2090 pda = asmap->physInfo;
2091 pdaP = asmap->parityInfo;
2092 for (i = 0; i < nWmirNodes; i++) {
2093 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
2094 RF_ASSERT(pda != NULL);
2095 wmirNode[i].params[0].p = pdaP;
2096 wmirNode[i].params[1].p = pda->bufPtr;
2097 wmirNode[i].params[2].v = parityStripeID;
2098 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2099 pda = pda->next;
2100 pdaP = pdaP->next;
2101 }
2102 RF_ASSERT(pda == NULL);
2103 RF_ASSERT(pdaP == NULL);
2104 }
2105 /* link the header node to the block node */
2106 RF_ASSERT(dag_h->numSuccedents == 1);
2107 RF_ASSERT(blockNode->numAntecedents == 0);
2108 dag_h->succedents[0] = blockNode;
2109
2110 /* link the block node to the write nodes */
2111 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2112 for (i = 0; i < nWndNodes; i++) {
2113 RF_ASSERT(wndNode[i].numAntecedents == 1);
2114 blockNode->succedents[i] = &wndNode[i];
2115 wndNode[i].antecedents[0] = blockNode;
2116 wndNode[i].antType[0] = rf_control;
2117 }
2118 for (i = 0; i < nWmirNodes; i++) {
2119 RF_ASSERT(wmirNode[i].numAntecedents == 1);
2120 blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2121 wmirNode[i].antecedents[0] = blockNode;
2122 wmirNode[i].antType[0] = rf_control;
2123 }
2124
2125 /* link the write nodes to the unblock node */
2126 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2127 for (i = 0; i < nWndNodes; i++) {
2128 RF_ASSERT(wndNode[i].numSuccedents == 1);
2129 wndNode[i].succedents[0] = unblockNode;
2130 unblockNode->antecedents[i] = &wndNode[i];
2131 unblockNode->antType[i] = rf_control;
2132 }
2133 for (i = 0; i < nWmirNodes; i++) {
2134 RF_ASSERT(wmirNode[i].numSuccedents == 1);
2135 wmirNode[i].succedents[0] = unblockNode;
2136 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2137 unblockNode->antType[i + nWndNodes] = rf_control;
2138 }
2139
2140 /* link the unblock node to the term node */
2141 RF_ASSERT(unblockNode->numSuccedents == 1);
2142 RF_ASSERT(termNode->numAntecedents == 1);
2143 RF_ASSERT(termNode->numSuccedents == 0);
2144 unblockNode->succedents[0] = termNode;
2145 termNode->antecedents[0] = unblockNode;
2146 termNode->antType[0] = rf_control;
2147
2148 return;
2149 }
2150