rf_dagffwr.c revision 1.5 1 /* $NetBSD: rf_dagffwr.c,v 1.5 2000/01/07 03:40:58 oster Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /*
30 * rf_dagff.c
31 *
32 * code for creating fault-free DAGs
33 *
34 */
35
36 #include "rf_types.h"
37 #include "rf_raid.h"
38 #include "rf_dag.h"
39 #include "rf_dagutils.h"
40 #include "rf_dagfuncs.h"
41 #include "rf_debugMem.h"
42 #include "rf_dagffrd.h"
43 #include "rf_memchunk.h"
44 #include "rf_general.h"
45 #include "rf_dagffwr.h"
46
47 /******************************************************************************
48 *
49 * General comments on DAG creation:
50 *
51 * All DAGs in this file use roll-away error recovery. Each DAG has a single
52 * commit node, usually called "Cmt." If an error occurs before the Cmt node
53 * is reached, the execution engine will halt forward execution and work
54 * backward through the graph, executing the undo functions. Assuming that
55 * each node in the graph prior to the Cmt node are undoable and atomic - or -
56 * does not make changes to permanent state, the graph will fail atomically.
57 * If an error occurs after the Cmt node executes, the engine will roll-forward
58 * through the graph, blindly executing nodes until it reaches the end.
59 * If a graph reaches the end, it is assumed to have completed successfully.
60 *
61 * A graph has only 1 Cmt node.
62 *
63 */
64
65
66 /******************************************************************************
67 *
68 * The following wrappers map the standard DAG creation interface to the
69 * DAG creation routines. Additionally, these wrappers enable experimentation
70 * with new DAG structures by providing an extra level of indirection, allowing
71 * the DAG creation routines to be replaced at this single point.
72 */
73
74
75 void
76 rf_CreateNonRedundantWriteDAG(
77 RF_Raid_t * raidPtr,
78 RF_AccessStripeMap_t * asmap,
79 RF_DagHeader_t * dag_h,
80 void *bp,
81 RF_RaidAccessFlags_t flags,
82 RF_AllocListElem_t * allocList,
83 RF_IoType_t type)
84 {
85 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
86 RF_IO_TYPE_WRITE);
87 }
88
89 void
90 rf_CreateRAID0WriteDAG(
91 RF_Raid_t * raidPtr,
92 RF_AccessStripeMap_t * asmap,
93 RF_DagHeader_t * dag_h,
94 void *bp,
95 RF_RaidAccessFlags_t flags,
96 RF_AllocListElem_t * allocList,
97 RF_IoType_t type)
98 {
99 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
100 RF_IO_TYPE_WRITE);
101 }
102
103 void
104 rf_CreateSmallWriteDAG(
105 RF_Raid_t * raidPtr,
106 RF_AccessStripeMap_t * asmap,
107 RF_DagHeader_t * dag_h,
108 void *bp,
109 RF_RaidAccessFlags_t flags,
110 RF_AllocListElem_t * allocList)
111 {
112 /* "normal" rollaway */
113 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
114 &rf_xorFuncs, NULL);
115 }
116
117 void
118 rf_CreateLargeWriteDAG(
119 RF_Raid_t * raidPtr,
120 RF_AccessStripeMap_t * asmap,
121 RF_DagHeader_t * dag_h,
122 void *bp,
123 RF_RaidAccessFlags_t flags,
124 RF_AllocListElem_t * allocList)
125 {
126 /* "normal" rollaway */
127 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
128 1, rf_RegularXorFunc, RF_TRUE);
129 }
130
131
132 /******************************************************************************
133 *
134 * DAG creation code begins here
135 */
136
137
138 /******************************************************************************
139 *
140 * creates a DAG to perform a large-write operation:
141 *
142 * / Rod \ / Wnd \
143 * H -- block- Rod - Xor - Cmt - Wnd --- T
144 * \ Rod / \ Wnp /
145 * \[Wnq]/
146 *
147 * The XOR node also does the Q calculation in the P+Q architecture.
148 * All nodes are before the commit node (Cmt) are assumed to be atomic and
149 * undoable - or - they make no changes to permanent state.
150 *
151 * Rod = read old data
152 * Cmt = commit node
153 * Wnp = write new parity
154 * Wnd = write new data
155 * Wnq = write new "q"
156 * [] denotes optional segments in the graph
157 *
158 * Parameters: raidPtr - description of the physical array
159 * asmap - logical & physical addresses for this access
160 * bp - buffer ptr (holds write data)
161 * flags - general flags (e.g. disk locking)
162 * allocList - list of memory allocated in DAG creation
163 * nfaults - number of faults array can tolerate
164 * (equal to # redundancy units in stripe)
165 * redfuncs - list of redundancy generating functions
166 *
167 *****************************************************************************/
168
169 void
170 rf_CommonCreateLargeWriteDAG(
171 RF_Raid_t * raidPtr,
172 RF_AccessStripeMap_t * asmap,
173 RF_DagHeader_t * dag_h,
174 void *bp,
175 RF_RaidAccessFlags_t flags,
176 RF_AllocListElem_t * allocList,
177 int nfaults,
178 int (*redFunc) (RF_DagNode_t *),
179 int allowBufferRecycle)
180 {
181 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
182 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
183 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
184 RF_AccessStripeMapHeader_t *new_asm_h[2];
185 RF_StripeNum_t parityStripeID;
186 char *sosBuffer, *eosBuffer;
187 RF_ReconUnitNum_t which_ru;
188 RF_RaidLayout_t *layoutPtr;
189 RF_PhysDiskAddr_t *pda;
190
191 layoutPtr = &(raidPtr->Layout);
192 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress,
193 &which_ru);
194
195 if (rf_dagDebug) {
196 printf("[Creating large-write DAG]\n");
197 }
198 dag_h->creator = "LargeWriteDAG";
199
200 dag_h->numCommitNodes = 1;
201 dag_h->numCommits = 0;
202 dag_h->numSuccedents = 1;
203
204 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
205 nWndNodes = asmap->numStripeUnitsAccessed;
206 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
207 (RF_DagNode_t *), allocList);
208 i = 0;
209 wndNodes = &nodes[i];
210 i += nWndNodes;
211 xorNode = &nodes[i];
212 i += 1;
213 wnpNode = &nodes[i];
214 i += 1;
215 blockNode = &nodes[i];
216 i += 1;
217 commitNode = &nodes[i];
218 i += 1;
219 termNode = &nodes[i];
220 i += 1;
221 if (nfaults == 2) {
222 wnqNode = &nodes[i];
223 i += 1;
224 } else {
225 wnqNode = NULL;
226 }
227 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h,
228 &nRodNodes, &sosBuffer, &eosBuffer, allocList);
229 if (nRodNodes > 0) {
230 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
231 (RF_DagNode_t *), allocList);
232 } else {
233 rodNodes = NULL;
234 }
235
236 /* begin node initialization */
237 if (nRodNodes > 0) {
238 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
239 NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
240 } else {
241 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
242 NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
243 }
244
245 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL,
246 nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList);
247 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL,
248 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
249
250 /* initialize the Rod nodes */
251 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
252 if (new_asm_h[asmNum]) {
253 pda = new_asm_h[asmNum]->stripeMap->physInfo;
254 while (pda) {
255 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,
256 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
257 "Rod", allocList);
258 rodNodes[nodeNum].params[0].p = pda;
259 rodNodes[nodeNum].params[1].p = pda->bufPtr;
260 rodNodes[nodeNum].params[2].v = parityStripeID;
261 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
262 0, 0, which_ru);
263 nodeNum++;
264 pda = pda->next;
265 }
266 }
267 }
268 RF_ASSERT(nodeNum == nRodNodes);
269
270 /* initialize the wnd nodes */
271 pda = asmap->physInfo;
272 for (i = 0; i < nWndNodes; i++) {
273 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
274 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
275 RF_ASSERT(pda != NULL);
276 wndNodes[i].params[0].p = pda;
277 wndNodes[i].params[1].p = pda->bufPtr;
278 wndNodes[i].params[2].v = parityStripeID;
279 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
280 pda = pda->next;
281 }
282
283 /* initialize the redundancy node */
284 if (nRodNodes > 0) {
285 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
286 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
287 "Xr ", allocList);
288 } else {
289 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
290 1, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
291 }
292 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
293 for (i = 0; i < nWndNodes; i++) {
294 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */
295 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */
296 }
297 for (i = 0; i < nRodNodes; i++) {
298 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */
299 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */
300 }
301 /* xor node needs to get at RAID information */
302 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
303
304 /*
305 * Look for an Rod node that reads a complete SU. If none, alloc a buffer
306 * to receive the parity info. Note that we can't use a new data buffer
307 * because it will not have gotten written when the xor occurs.
308 */
309 if (allowBufferRecycle) {
310 for (i = 0; i < nRodNodes; i++) {
311 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
312 break;
313 }
314 }
315 if ((!allowBufferRecycle) || (i == nRodNodes)) {
316 RF_CallocAndAdd(xorNode->results[0], 1,
317 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
318 (void *), allocList);
319 } else {
320 xorNode->results[0] = rodNodes[i].params[1].p;
321 }
322
323 /* initialize the Wnp node */
324 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
325 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
326 wnpNode->params[0].p = asmap->parityInfo;
327 wnpNode->params[1].p = xorNode->results[0];
328 wnpNode->params[2].v = parityStripeID;
329 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
330 /* parityInfo must describe entire parity unit */
331 RF_ASSERT(asmap->parityInfo->next == NULL);
332
333 if (nfaults == 2) {
334 /*
335 * We never try to recycle a buffer for the Q calcuation
336 * in addition to the parity. This would cause two buffers
337 * to get smashed during the P and Q calculation, guaranteeing
338 * one would be wrong.
339 */
340 RF_CallocAndAdd(xorNode->results[1], 1,
341 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
342 (void *), allocList);
343 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
344 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
345 wnqNode->params[0].p = asmap->qInfo;
346 wnqNode->params[1].p = xorNode->results[1];
347 wnqNode->params[2].v = parityStripeID;
348 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
349 /* parityInfo must describe entire parity unit */
350 RF_ASSERT(asmap->parityInfo->next == NULL);
351 }
352 /*
353 * Connect nodes to form graph.
354 */
355
356 /* connect dag header to block node */
357 RF_ASSERT(blockNode->numAntecedents == 0);
358 dag_h->succedents[0] = blockNode;
359
360 if (nRodNodes > 0) {
361 /* connect the block node to the Rod nodes */
362 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
363 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
364 for (i = 0; i < nRodNodes; i++) {
365 RF_ASSERT(rodNodes[i].numAntecedents == 1);
366 blockNode->succedents[i] = &rodNodes[i];
367 rodNodes[i].antecedents[0] = blockNode;
368 rodNodes[i].antType[0] = rf_control;
369
370 /* connect the Rod nodes to the Xor node */
371 RF_ASSERT(rodNodes[i].numSuccedents == 1);
372 rodNodes[i].succedents[0] = xorNode;
373 xorNode->antecedents[i] = &rodNodes[i];
374 xorNode->antType[i] = rf_trueData;
375 }
376 } else {
377 /* connect the block node to the Xor node */
378 RF_ASSERT(blockNode->numSuccedents == 1);
379 RF_ASSERT(xorNode->numAntecedents == 1);
380 blockNode->succedents[0] = xorNode;
381 xorNode->antecedents[0] = blockNode;
382 xorNode->antType[0] = rf_control;
383 }
384
385 /* connect the xor node to the commit node */
386 RF_ASSERT(xorNode->numSuccedents == 1);
387 RF_ASSERT(commitNode->numAntecedents == 1);
388 xorNode->succedents[0] = commitNode;
389 commitNode->antecedents[0] = xorNode;
390 commitNode->antType[0] = rf_control;
391
392 /* connect the commit node to the write nodes */
393 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
394 for (i = 0; i < nWndNodes; i++) {
395 RF_ASSERT(wndNodes->numAntecedents == 1);
396 commitNode->succedents[i] = &wndNodes[i];
397 wndNodes[i].antecedents[0] = commitNode;
398 wndNodes[i].antType[0] = rf_control;
399 }
400 RF_ASSERT(wnpNode->numAntecedents == 1);
401 commitNode->succedents[nWndNodes] = wnpNode;
402 wnpNode->antecedents[0] = commitNode;
403 wnpNode->antType[0] = rf_trueData;
404 if (nfaults == 2) {
405 RF_ASSERT(wnqNode->numAntecedents == 1);
406 commitNode->succedents[nWndNodes + 1] = wnqNode;
407 wnqNode->antecedents[0] = commitNode;
408 wnqNode->antType[0] = rf_trueData;
409 }
410 /* connect the write nodes to the term node */
411 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
412 RF_ASSERT(termNode->numSuccedents == 0);
413 for (i = 0; i < nWndNodes; i++) {
414 RF_ASSERT(wndNodes->numSuccedents == 1);
415 wndNodes[i].succedents[0] = termNode;
416 termNode->antecedents[i] = &wndNodes[i];
417 termNode->antType[i] = rf_control;
418 }
419 RF_ASSERT(wnpNode->numSuccedents == 1);
420 wnpNode->succedents[0] = termNode;
421 termNode->antecedents[nWndNodes] = wnpNode;
422 termNode->antType[nWndNodes] = rf_control;
423 if (nfaults == 2) {
424 RF_ASSERT(wnqNode->numSuccedents == 1);
425 wnqNode->succedents[0] = termNode;
426 termNode->antecedents[nWndNodes + 1] = wnqNode;
427 termNode->antType[nWndNodes + 1] = rf_control;
428 }
429 }
430 /******************************************************************************
431 *
432 * creates a DAG to perform a small-write operation (either raid 5 or pq),
433 * which is as follows:
434 *
435 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
436 * \- Rod X / \----> Wnd [Und]-/
437 * [\- Rod X / \---> Wnd [Und]-/]
438 * [\- Roq -> Q / \--> Wnq [Unq]-/]
439 *
440 * Rop = read old parity
441 * Rod = read old data
442 * Roq = read old "q"
443 * Cmt = commit node
444 * Und = unlock data disk
445 * Unp = unlock parity disk
446 * Unq = unlock q disk
447 * Wnp = write new parity
448 * Wnd = write new data
449 * Wnq = write new "q"
450 * [ ] denotes optional segments in the graph
451 *
452 * Parameters: raidPtr - description of the physical array
453 * asmap - logical & physical addresses for this access
454 * bp - buffer ptr (holds write data)
455 * flags - general flags (e.g. disk locking)
456 * allocList - list of memory allocated in DAG creation
457 * pfuncs - list of parity generating functions
458 * qfuncs - list of q generating functions
459 *
460 * A null qfuncs indicates single fault tolerant
461 *****************************************************************************/
462
463 void
464 rf_CommonCreateSmallWriteDAG(
465 RF_Raid_t * raidPtr,
466 RF_AccessStripeMap_t * asmap,
467 RF_DagHeader_t * dag_h,
468 void *bp,
469 RF_RaidAccessFlags_t flags,
470 RF_AllocListElem_t * allocList,
471 RF_RedFuncs_t * pfuncs,
472 RF_RedFuncs_t * qfuncs)
473 {
474 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
475 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
476 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
477 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
478 int i, j, nNodes, totalNumNodes, lu_flag;
479 RF_ReconUnitNum_t which_ru;
480 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
481 int (*qfunc) (RF_DagNode_t *);
482 int numDataNodes, numParityNodes;
483 RF_StripeNum_t parityStripeID;
484 RF_PhysDiskAddr_t *pda;
485 char *name, *qname;
486 long nfaults;
487
488 nfaults = qfuncs ? 2 : 1;
489 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
490
491 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
492 asmap->raidAddress, &which_ru);
493 pda = asmap->physInfo;
494 numDataNodes = asmap->numStripeUnitsAccessed;
495 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
496
497 if (rf_dagDebug) {
498 printf("[Creating small-write DAG]\n");
499 }
500 RF_ASSERT(numDataNodes > 0);
501 dag_h->creator = "SmallWriteDAG";
502
503 dag_h->numCommitNodes = 1;
504 dag_h->numCommits = 0;
505 dag_h->numSuccedents = 1;
506
507 /*
508 * DAG creation occurs in four steps:
509 * 1. count the number of nodes in the DAG
510 * 2. create the nodes
511 * 3. initialize the nodes
512 * 4. connect the nodes
513 */
514
515 /*
516 * Step 1. compute number of nodes in the graph
517 */
518
519 /* number of nodes: a read and write for each data unit a redundancy
520 * computation node for each parity node (nfaults * nparity) a read
521 * and write for each parity unit a block and commit node (2) a
522 * terminate node if atomic RMW an unlock node for each data unit,
523 * redundancy unit */
524 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
525 + (nfaults * 2 * numParityNodes) + 3;
526 if (lu_flag) {
527 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
528 }
529 /*
530 * Step 2. create the nodes
531 */
532 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
533 (RF_DagNode_t *), allocList);
534 i = 0;
535 blockNode = &nodes[i];
536 i += 1;
537 commitNode = &nodes[i];
538 i += 1;
539 readDataNodes = &nodes[i];
540 i += numDataNodes;
541 readParityNodes = &nodes[i];
542 i += numParityNodes;
543 writeDataNodes = &nodes[i];
544 i += numDataNodes;
545 writeParityNodes = &nodes[i];
546 i += numParityNodes;
547 xorNodes = &nodes[i];
548 i += numParityNodes;
549 termNode = &nodes[i];
550 i += 1;
551 if (lu_flag) {
552 unlockDataNodes = &nodes[i];
553 i += numDataNodes;
554 unlockParityNodes = &nodes[i];
555 i += numParityNodes;
556 } else {
557 unlockDataNodes = unlockParityNodes = NULL;
558 }
559 if (nfaults == 2) {
560 readQNodes = &nodes[i];
561 i += numParityNodes;
562 writeQNodes = &nodes[i];
563 i += numParityNodes;
564 qNodes = &nodes[i];
565 i += numParityNodes;
566 if (lu_flag) {
567 unlockQNodes = &nodes[i];
568 i += numParityNodes;
569 } else {
570 unlockQNodes = NULL;
571 }
572 } else {
573 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
574 }
575 RF_ASSERT(i == totalNumNodes);
576
577 /*
578 * Step 3. initialize the nodes
579 */
580 /* initialize block node (Nil) */
581 nNodes = numDataNodes + (nfaults * numParityNodes);
582 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
583 NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
584
585 /* initialize commit node (Cmt) */
586 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
587 NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
588
589 /* initialize terminate node (Trm) */
590 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
591 NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
592
593 /* initialize nodes which read old data (Rod) */
594 for (i = 0; i < numDataNodes; i++) {
595 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
596 rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h,
597 "Rod", allocList);
598 RF_ASSERT(pda != NULL);
599 /* physical disk addr desc */
600 readDataNodes[i].params[0].p = pda;
601 /* buffer to hold old data */
602 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
603 dag_h, pda, allocList);
604 readDataNodes[i].params[2].v = parityStripeID;
605 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
606 lu_flag, 0, which_ru);
607 pda = pda->next;
608 for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
609 readDataNodes[i].propList[j] = NULL;
610 }
611 }
612
613 /* initialize nodes which read old parity (Rop) */
614 pda = asmap->parityInfo;
615 i = 0;
616 for (i = 0; i < numParityNodes; i++) {
617 RF_ASSERT(pda != NULL);
618 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc,
619 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4,
620 0, dag_h, "Rop", allocList);
621 readParityNodes[i].params[0].p = pda;
622 /* buffer to hold old parity */
623 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
624 dag_h, pda, allocList);
625 readParityNodes[i].params[2].v = parityStripeID;
626 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
627 lu_flag, 0, which_ru);
628 pda = pda->next;
629 for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
630 readParityNodes[i].propList[0] = NULL;
631 }
632 }
633
634 /* initialize nodes which read old Q (Roq) */
635 if (nfaults == 2) {
636 pda = asmap->qInfo;
637 for (i = 0; i < numParityNodes; i++) {
638 RF_ASSERT(pda != NULL);
639 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
640 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
641 readQNodes[i].params[0].p = pda;
642 /* buffer to hold old Q */
643 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda,
644 allocList);
645 readQNodes[i].params[2].v = parityStripeID;
646 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
647 lu_flag, 0, which_ru);
648 pda = pda->next;
649 for (j = 0; j < readQNodes[i].numSuccedents; j++) {
650 readQNodes[i].propList[0] = NULL;
651 }
652 }
653 }
654 /* initialize nodes which write new data (Wnd) */
655 pda = asmap->physInfo;
656 for (i = 0; i < numDataNodes; i++) {
657 RF_ASSERT(pda != NULL);
658 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
659 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
660 "Wnd", allocList);
661 /* physical disk addr desc */
662 writeDataNodes[i].params[0].p = pda;
663 /* buffer holding new data to be written */
664 writeDataNodes[i].params[1].p = pda->bufPtr;
665 writeDataNodes[i].params[2].v = parityStripeID;
666 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
667 0, 0, which_ru);
668 if (lu_flag) {
669 /* initialize node to unlock the disk queue */
670 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
671 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
672 "Und", allocList);
673 /* physical disk addr desc */
674 unlockDataNodes[i].params[0].p = pda;
675 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
676 0, lu_flag, which_ru);
677 }
678 pda = pda->next;
679 }
680
681 /*
682 * Initialize nodes which compute new parity and Q.
683 */
684 /*
685 * We use the simple XOR func in the double-XOR case, and when
686 * we're accessing only a portion of one stripe unit. The distinction
687 * between the two is that the regular XOR func assumes that the targbuf
688 * is a full SU in size, and examines the pda associated with the buffer
689 * to decide where within the buffer to XOR the data, whereas
690 * the simple XOR func just XORs the data into the start of the buffer.
691 */
692 if ((numParityNodes == 2) || ((numDataNodes == 1)
693 && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
694 func = pfuncs->simple;
695 undoFunc = rf_NullNodeUndoFunc;
696 name = pfuncs->SimpleName;
697 if (qfuncs) {
698 qfunc = qfuncs->simple;
699 qname = qfuncs->SimpleName;
700 } else {
701 qfunc = NULL;
702 qname = NULL;
703 }
704 } else {
705 func = pfuncs->regular;
706 undoFunc = rf_NullNodeUndoFunc;
707 name = pfuncs->RegularName;
708 if (qfuncs) {
709 qfunc = qfuncs->regular;
710 qname = qfuncs->RegularName;
711 } else {
712 qfunc = NULL;
713 qname = NULL;
714 }
715 }
716 /*
717 * Initialize the xor nodes: params are {pda,buf}
718 * from {Rod,Wnd,Rop} nodes, and raidPtr
719 */
720 if (numParityNodes == 2) {
721 /* double-xor case */
722 for (i = 0; i < numParityNodes; i++) {
723 /* note: no wakeup func for xor */
724 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL,
725 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList);
726 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
727 xorNodes[i].params[0] = readDataNodes[i].params[0];
728 xorNodes[i].params[1] = readDataNodes[i].params[1];
729 xorNodes[i].params[2] = readParityNodes[i].params[0];
730 xorNodes[i].params[3] = readParityNodes[i].params[1];
731 xorNodes[i].params[4] = writeDataNodes[i].params[0];
732 xorNodes[i].params[5] = writeDataNodes[i].params[1];
733 xorNodes[i].params[6].p = raidPtr;
734 /* use old parity buf as target buf */
735 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
736 if (nfaults == 2) {
737 /* note: no wakeup func for qor */
738 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
739 (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList);
740 qNodes[i].params[0] = readDataNodes[i].params[0];
741 qNodes[i].params[1] = readDataNodes[i].params[1];
742 qNodes[i].params[2] = readQNodes[i].params[0];
743 qNodes[i].params[3] = readQNodes[i].params[1];
744 qNodes[i].params[4] = writeDataNodes[i].params[0];
745 qNodes[i].params[5] = writeDataNodes[i].params[1];
746 qNodes[i].params[6].p = raidPtr;
747 /* use old Q buf as target buf */
748 qNodes[i].results[0] = readQNodes[i].params[1].p;
749 }
750 }
751 } else {
752 /* there is only one xor node in this case */
753 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1,
754 (numDataNodes + numParityNodes),
755 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
756 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
757 for (i = 0; i < numDataNodes + 1; i++) {
758 /* set up params related to Rod and Rop nodes */
759 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
760 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
761 }
762 for (i = 0; i < numDataNodes; i++) {
763 /* set up params related to Wnd and Wnp nodes */
764 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
765 writeDataNodes[i].params[0];
766 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
767 writeDataNodes[i].params[1];
768 }
769 /* xor node needs to get at RAID information */
770 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
771 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
772 if (nfaults == 2) {
773 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
774 (numDataNodes + numParityNodes),
775 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
776 qname, allocList);
777 for (i = 0; i < numDataNodes; i++) {
778 /* set up params related to Rod */
779 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
780 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */
781 }
782 /* and read old q */
783 qNodes[0].params[2 * numDataNodes + 0] = /* pda */
784 readQNodes[0].params[0];
785 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */
786 readQNodes[0].params[1];
787 for (i = 0; i < numDataNodes; i++) {
788 /* set up params related to Wnd nodes */
789 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */
790 writeDataNodes[i].params[0];
791 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */
792 writeDataNodes[i].params[1];
793 }
794 /* xor node needs to get at RAID information */
795 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
796 qNodes[0].results[0] = readQNodes[0].params[1].p;
797 }
798 }
799
800 /* initialize nodes which write new parity (Wnp) */
801 pda = asmap->parityInfo;
802 for (i = 0; i < numParityNodes; i++) {
803 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
804 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
805 "Wnp", allocList);
806 RF_ASSERT(pda != NULL);
807 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
808 * filled in by xor node */
809 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
810 * parity write
811 * operation */
812 writeParityNodes[i].params[2].v = parityStripeID;
813 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
814 0, 0, which_ru);
815 if (lu_flag) {
816 /* initialize node to unlock the disk queue */
817 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
818 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
819 "Unp", allocList);
820 unlockParityNodes[i].params[0].p = pda; /* physical disk addr
821 * desc */
822 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
823 0, lu_flag, which_ru);
824 }
825 pda = pda->next;
826 }
827
828 /* initialize nodes which write new Q (Wnq) */
829 if (nfaults == 2) {
830 pda = asmap->qInfo;
831 for (i = 0; i < numParityNodes; i++) {
832 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
833 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
834 "Wnq", allocList);
835 RF_ASSERT(pda != NULL);
836 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
837 * filled in by xor node */
838 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
839 * parity write
840 * operation */
841 writeQNodes[i].params[2].v = parityStripeID;
842 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
843 0, 0, which_ru);
844 if (lu_flag) {
845 /* initialize node to unlock the disk queue */
846 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
847 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
848 "Unq", allocList);
849 unlockQNodes[i].params[0].p = pda; /* physical disk addr
850 * desc */
851 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
852 0, lu_flag, which_ru);
853 }
854 pda = pda->next;
855 }
856 }
857 /*
858 * Step 4. connect the nodes.
859 */
860
861 /* connect header to block node */
862 dag_h->succedents[0] = blockNode;
863
864 /* connect block node to read old data nodes */
865 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
866 for (i = 0; i < numDataNodes; i++) {
867 blockNode->succedents[i] = &readDataNodes[i];
868 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
869 readDataNodes[i].antecedents[0] = blockNode;
870 readDataNodes[i].antType[0] = rf_control;
871 }
872
873 /* connect block node to read old parity nodes */
874 for (i = 0; i < numParityNodes; i++) {
875 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
876 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
877 readParityNodes[i].antecedents[0] = blockNode;
878 readParityNodes[i].antType[0] = rf_control;
879 }
880
881 /* connect block node to read old Q nodes */
882 if (nfaults == 2) {
883 for (i = 0; i < numParityNodes; i++) {
884 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
885 RF_ASSERT(readQNodes[i].numAntecedents == 1);
886 readQNodes[i].antecedents[0] = blockNode;
887 readQNodes[i].antType[0] = rf_control;
888 }
889 }
890 /* connect read old data nodes to xor nodes */
891 for (i = 0; i < numDataNodes; i++) {
892 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
893 for (j = 0; j < numParityNodes; j++) {
894 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
895 readDataNodes[i].succedents[j] = &xorNodes[j];
896 xorNodes[j].antecedents[i] = &readDataNodes[i];
897 xorNodes[j].antType[i] = rf_trueData;
898 }
899 }
900
901 /* connect read old data nodes to q nodes */
902 if (nfaults == 2) {
903 for (i = 0; i < numDataNodes; i++) {
904 for (j = 0; j < numParityNodes; j++) {
905 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
906 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
907 qNodes[j].antecedents[i] = &readDataNodes[i];
908 qNodes[j].antType[i] = rf_trueData;
909 }
910 }
911 }
912 /* connect read old parity nodes to xor nodes */
913 for (i = 0; i < numParityNodes; i++) {
914 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
915 for (j = 0; j < numParityNodes; j++) {
916 readParityNodes[i].succedents[j] = &xorNodes[j];
917 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
918 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
919 }
920 }
921
922 /* connect read old q nodes to q nodes */
923 if (nfaults == 2) {
924 for (i = 0; i < numParityNodes; i++) {
925 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
926 for (j = 0; j < numParityNodes; j++) {
927 readQNodes[i].succedents[j] = &qNodes[j];
928 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
929 qNodes[j].antType[numDataNodes + i] = rf_trueData;
930 }
931 }
932 }
933 /* connect xor nodes to commit node */
934 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
935 for (i = 0; i < numParityNodes; i++) {
936 RF_ASSERT(xorNodes[i].numSuccedents == 1);
937 xorNodes[i].succedents[0] = commitNode;
938 commitNode->antecedents[i] = &xorNodes[i];
939 commitNode->antType[i] = rf_control;
940 }
941
942 /* connect q nodes to commit node */
943 if (nfaults == 2) {
944 for (i = 0; i < numParityNodes; i++) {
945 RF_ASSERT(qNodes[i].numSuccedents == 1);
946 qNodes[i].succedents[0] = commitNode;
947 commitNode->antecedents[i + numParityNodes] = &qNodes[i];
948 commitNode->antType[i + numParityNodes] = rf_control;
949 }
950 }
951 /* connect commit node to write nodes */
952 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
953 for (i = 0; i < numDataNodes; i++) {
954 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
955 commitNode->succedents[i] = &writeDataNodes[i];
956 writeDataNodes[i].antecedents[0] = commitNode;
957 writeDataNodes[i].antType[0] = rf_trueData;
958 }
959 for (i = 0; i < numParityNodes; i++) {
960 RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
961 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
962 writeParityNodes[i].antecedents[0] = commitNode;
963 writeParityNodes[i].antType[0] = rf_trueData;
964 }
965 if (nfaults == 2) {
966 for (i = 0; i < numParityNodes; i++) {
967 RF_ASSERT(writeQNodes[i].numAntecedents == 1);
968 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
969 writeQNodes[i].antecedents[0] = commitNode;
970 writeQNodes[i].antType[0] = rf_trueData;
971 }
972 }
973 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
974 RF_ASSERT(termNode->numSuccedents == 0);
975 for (i = 0; i < numDataNodes; i++) {
976 if (lu_flag) {
977 /* connect write new data nodes to unlock nodes */
978 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
979 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
980 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
981 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
982 unlockDataNodes[i].antType[0] = rf_control;
983
984 /* connect unlock nodes to term node */
985 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
986 unlockDataNodes[i].succedents[0] = termNode;
987 termNode->antecedents[i] = &unlockDataNodes[i];
988 termNode->antType[i] = rf_control;
989 } else {
990 /* connect write new data nodes to term node */
991 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
992 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
993 writeDataNodes[i].succedents[0] = termNode;
994 termNode->antecedents[i] = &writeDataNodes[i];
995 termNode->antType[i] = rf_control;
996 }
997 }
998
999 for (i = 0; i < numParityNodes; i++) {
1000 if (lu_flag) {
1001 /* connect write new parity nodes to unlock nodes */
1002 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1003 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1004 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1005 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1006 unlockParityNodes[i].antType[0] = rf_control;
1007
1008 /* connect unlock nodes to term node */
1009 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1010 unlockParityNodes[i].succedents[0] = termNode;
1011 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1012 termNode->antType[numDataNodes + i] = rf_control;
1013 } else {
1014 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1015 writeParityNodes[i].succedents[0] = termNode;
1016 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1017 termNode->antType[numDataNodes + i] = rf_control;
1018 }
1019 }
1020
1021 if (nfaults == 2) {
1022 for (i = 0; i < numParityNodes; i++) {
1023 if (lu_flag) {
1024 /* connect write new Q nodes to unlock nodes */
1025 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1026 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1027 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1028 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1029 unlockQNodes[i].antType[0] = rf_control;
1030
1031 /* connect unlock nodes to unblock node */
1032 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1033 unlockQNodes[i].succedents[0] = termNode;
1034 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1035 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1036 } else {
1037 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1038 writeQNodes[i].succedents[0] = termNode;
1039 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1040 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1041 }
1042 }
1043 }
1044 }
1045
1046
1047 /******************************************************************************
1048 * create a write graph (fault-free or degraded) for RAID level 1
1049 *
1050 * Hdr -> Commit -> Wpd -> Nil -> Trm
1051 * -> Wsd ->
1052 *
1053 * The "Wpd" node writes data to the primary copy in the mirror pair
1054 * The "Wsd" node writes data to the secondary copy in the mirror pair
1055 *
1056 * Parameters: raidPtr - description of the physical array
1057 * asmap - logical & physical addresses for this access
1058 * bp - buffer ptr (holds write data)
1059 * flags - general flags (e.g. disk locking)
1060 * allocList - list of memory allocated in DAG creation
1061 *****************************************************************************/
1062
1063 void
1064 rf_CreateRaidOneWriteDAG(
1065 RF_Raid_t * raidPtr,
1066 RF_AccessStripeMap_t * asmap,
1067 RF_DagHeader_t * dag_h,
1068 void *bp,
1069 RF_RaidAccessFlags_t flags,
1070 RF_AllocListElem_t * allocList)
1071 {
1072 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1073 RF_DagNode_t *nodes, *wndNode, *wmirNode;
1074 int nWndNodes, nWmirNodes, i;
1075 RF_ReconUnitNum_t which_ru;
1076 RF_PhysDiskAddr_t *pda, *pdaP;
1077 RF_StripeNum_t parityStripeID;
1078
1079 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1080 asmap->raidAddress, &which_ru);
1081 if (rf_dagDebug) {
1082 printf("[Creating RAID level 1 write DAG]\n");
1083 }
1084 dag_h->creator = "RaidOneWriteDAG";
1085
1086 /* 2 implies access not SU aligned */
1087 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1088 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1089
1090 /* alloc the Wnd nodes and the Wmir node */
1091 if (asmap->numDataFailed == 1)
1092 nWndNodes--;
1093 if (asmap->numParityFailed == 1)
1094 nWmirNodes--;
1095
1096 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock
1097 * + terminator) */
1098 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1099 (RF_DagNode_t *), allocList);
1100 i = 0;
1101 wndNode = &nodes[i];
1102 i += nWndNodes;
1103 wmirNode = &nodes[i];
1104 i += nWmirNodes;
1105 commitNode = &nodes[i];
1106 i += 1;
1107 unblockNode = &nodes[i];
1108 i += 1;
1109 termNode = &nodes[i];
1110 i += 1;
1111 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1112
1113 /* this dag can commit immediately */
1114 dag_h->numCommitNodes = 1;
1115 dag_h->numCommits = 0;
1116 dag_h->numSuccedents = 1;
1117
1118 /* initialize the commit, unblock, and term nodes */
1119 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1120 NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList);
1121 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1122 NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
1123 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
1124 NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1125
1126 /* initialize the wnd nodes */
1127 if (nWndNodes > 0) {
1128 pda = asmap->physInfo;
1129 for (i = 0; i < nWndNodes; i++) {
1130 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1131 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
1132 RF_ASSERT(pda != NULL);
1133 wndNode[i].params[0].p = pda;
1134 wndNode[i].params[1].p = pda->bufPtr;
1135 wndNode[i].params[2].v = parityStripeID;
1136 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1137 pda = pda->next;
1138 }
1139 RF_ASSERT(pda == NULL);
1140 }
1141 /* initialize the mirror nodes */
1142 if (nWmirNodes > 0) {
1143 pda = asmap->physInfo;
1144 pdaP = asmap->parityInfo;
1145 for (i = 0; i < nWmirNodes; i++) {
1146 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1147 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
1148 RF_ASSERT(pda != NULL);
1149 wmirNode[i].params[0].p = pdaP;
1150 wmirNode[i].params[1].p = pda->bufPtr;
1151 wmirNode[i].params[2].v = parityStripeID;
1152 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1153 pda = pda->next;
1154 pdaP = pdaP->next;
1155 }
1156 RF_ASSERT(pda == NULL);
1157 RF_ASSERT(pdaP == NULL);
1158 }
1159 /* link the header node to the commit node */
1160 RF_ASSERT(dag_h->numSuccedents == 1);
1161 RF_ASSERT(commitNode->numAntecedents == 0);
1162 dag_h->succedents[0] = commitNode;
1163
1164 /* link the commit node to the write nodes */
1165 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1166 for (i = 0; i < nWndNodes; i++) {
1167 RF_ASSERT(wndNode[i].numAntecedents == 1);
1168 commitNode->succedents[i] = &wndNode[i];
1169 wndNode[i].antecedents[0] = commitNode;
1170 wndNode[i].antType[0] = rf_control;
1171 }
1172 for (i = 0; i < nWmirNodes; i++) {
1173 RF_ASSERT(wmirNode[i].numAntecedents == 1);
1174 commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1175 wmirNode[i].antecedents[0] = commitNode;
1176 wmirNode[i].antType[0] = rf_control;
1177 }
1178
1179 /* link the write nodes to the unblock node */
1180 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1181 for (i = 0; i < nWndNodes; i++) {
1182 RF_ASSERT(wndNode[i].numSuccedents == 1);
1183 wndNode[i].succedents[0] = unblockNode;
1184 unblockNode->antecedents[i] = &wndNode[i];
1185 unblockNode->antType[i] = rf_control;
1186 }
1187 for (i = 0; i < nWmirNodes; i++) {
1188 RF_ASSERT(wmirNode[i].numSuccedents == 1);
1189 wmirNode[i].succedents[0] = unblockNode;
1190 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1191 unblockNode->antType[i + nWndNodes] = rf_control;
1192 }
1193
1194 /* link the unblock node to the term node */
1195 RF_ASSERT(unblockNode->numSuccedents == 1);
1196 RF_ASSERT(termNode->numAntecedents == 1);
1197 RF_ASSERT(termNode->numSuccedents == 0);
1198 unblockNode->succedents[0] = termNode;
1199 termNode->antecedents[0] = unblockNode;
1200 termNode->antType[0] = rf_control;
1201 }
1202
1203
1204
1205 /* DAGs which have no commit points.
1206 *
1207 * The following DAGs are used in forward and backward error recovery experiments.
1208 * They are identical to the DAGs above this comment with the exception that the
1209 * the commit points have been removed.
1210 */
1211
1212
1213
1214 void
1215 rf_CommonCreateLargeWriteDAGFwd(
1216 RF_Raid_t * raidPtr,
1217 RF_AccessStripeMap_t * asmap,
1218 RF_DagHeader_t * dag_h,
1219 void *bp,
1220 RF_RaidAccessFlags_t flags,
1221 RF_AllocListElem_t * allocList,
1222 int nfaults,
1223 int (*redFunc) (RF_DagNode_t *),
1224 int allowBufferRecycle)
1225 {
1226 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1227 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1228 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1229 RF_AccessStripeMapHeader_t *new_asm_h[2];
1230 RF_StripeNum_t parityStripeID;
1231 char *sosBuffer, *eosBuffer;
1232 RF_ReconUnitNum_t which_ru;
1233 RF_RaidLayout_t *layoutPtr;
1234 RF_PhysDiskAddr_t *pda;
1235
1236 layoutPtr = &(raidPtr->Layout);
1237 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1238
1239 if (rf_dagDebug)
1240 printf("[Creating large-write DAG]\n");
1241 dag_h->creator = "LargeWriteDAGFwd";
1242
1243 dag_h->numCommitNodes = 0;
1244 dag_h->numCommits = 0;
1245 dag_h->numSuccedents = 1;
1246
1247 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
1248 nWndNodes = asmap->numStripeUnitsAccessed;
1249 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1250 i = 0;
1251 wndNodes = &nodes[i];
1252 i += nWndNodes;
1253 xorNode = &nodes[i];
1254 i += 1;
1255 wnpNode = &nodes[i];
1256 i += 1;
1257 blockNode = &nodes[i];
1258 i += 1;
1259 syncNode = &nodes[i];
1260 i += 1;
1261 termNode = &nodes[i];
1262 i += 1;
1263 if (nfaults == 2) {
1264 wnqNode = &nodes[i];
1265 i += 1;
1266 } else {
1267 wnqNode = NULL;
1268 }
1269 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1270 if (nRodNodes > 0) {
1271 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1272 } else {
1273 rodNodes = NULL;
1274 }
1275
1276 /* begin node initialization */
1277 if (nRodNodes > 0) {
1278 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
1279 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList);
1280 } else {
1281 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
1282 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList);
1283 }
1284
1285 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
1286
1287 /* initialize the Rod nodes */
1288 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1289 if (new_asm_h[asmNum]) {
1290 pda = new_asm_h[asmNum]->stripeMap->physInfo;
1291 while (pda) {
1292 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList);
1293 rodNodes[nodeNum].params[0].p = pda;
1294 rodNodes[nodeNum].params[1].p = pda->bufPtr;
1295 rodNodes[nodeNum].params[2].v = parityStripeID;
1296 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1297 nodeNum++;
1298 pda = pda->next;
1299 }
1300 }
1301 }
1302 RF_ASSERT(nodeNum == nRodNodes);
1303
1304 /* initialize the wnd nodes */
1305 pda = asmap->physInfo;
1306 for (i = 0; i < nWndNodes; i++) {
1307 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1308 RF_ASSERT(pda != NULL);
1309 wndNodes[i].params[0].p = pda;
1310 wndNodes[i].params[1].p = pda->bufPtr;
1311 wndNodes[i].params[2].v = parityStripeID;
1312 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1313 pda = pda->next;
1314 }
1315
1316 /* initialize the redundancy node */
1317 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
1318 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1319 for (i = 0; i < nWndNodes; i++) {
1320 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */
1321 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */
1322 }
1323 for (i = 0; i < nRodNodes; i++) {
1324 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */
1325 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */
1326 }
1327 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; /* xor node needs to get
1328 * at RAID information */
1329
1330 /* look for an Rod node that reads a complete SU. If none, alloc a
1331 * buffer to receive the parity info. Note that we can't use a new
1332 * data buffer because it will not have gotten written when the xor
1333 * occurs. */
1334 if (allowBufferRecycle) {
1335 for (i = 0; i < nRodNodes; i++)
1336 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1337 break;
1338 }
1339 if ((!allowBufferRecycle) || (i == nRodNodes)) {
1340 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1341 } else
1342 xorNode->results[0] = rodNodes[i].params[1].p;
1343
1344 /* initialize the Wnp node */
1345 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
1346 wnpNode->params[0].p = asmap->parityInfo;
1347 wnpNode->params[1].p = xorNode->results[0];
1348 wnpNode->params[2].v = parityStripeID;
1349 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1350 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must
1351 * describe entire
1352 * parity unit */
1353
1354 if (nfaults == 2) {
1355 /* we never try to recycle a buffer for the Q calcuation in
1356 * addition to the parity. This would cause two buffers to get
1357 * smashed during the P and Q calculation, guaranteeing one
1358 * would be wrong. */
1359 RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1360 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
1361 wnqNode->params[0].p = asmap->qInfo;
1362 wnqNode->params[1].p = xorNode->results[1];
1363 wnqNode->params[2].v = parityStripeID;
1364 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1365 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must
1366 * describe entire
1367 * parity unit */
1368 }
1369 /* connect nodes to form graph */
1370
1371 /* connect dag header to block node */
1372 RF_ASSERT(blockNode->numAntecedents == 0);
1373 dag_h->succedents[0] = blockNode;
1374
1375 if (nRodNodes > 0) {
1376 /* connect the block node to the Rod nodes */
1377 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1378 RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1379 for (i = 0; i < nRodNodes; i++) {
1380 RF_ASSERT(rodNodes[i].numAntecedents == 1);
1381 blockNode->succedents[i] = &rodNodes[i];
1382 rodNodes[i].antecedents[0] = blockNode;
1383 rodNodes[i].antType[0] = rf_control;
1384
1385 /* connect the Rod nodes to the Nil node */
1386 RF_ASSERT(rodNodes[i].numSuccedents == 1);
1387 rodNodes[i].succedents[0] = syncNode;
1388 syncNode->antecedents[i] = &rodNodes[i];
1389 syncNode->antType[i] = rf_trueData;
1390 }
1391 } else {
1392 /* connect the block node to the Nil node */
1393 RF_ASSERT(blockNode->numSuccedents == 1);
1394 RF_ASSERT(syncNode->numAntecedents == 1);
1395 blockNode->succedents[0] = syncNode;
1396 syncNode->antecedents[0] = blockNode;
1397 syncNode->antType[0] = rf_control;
1398 }
1399
1400 /* connect the sync node to the Wnd nodes */
1401 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1402 for (i = 0; i < nWndNodes; i++) {
1403 RF_ASSERT(wndNodes->numAntecedents == 1);
1404 syncNode->succedents[i] = &wndNodes[i];
1405 wndNodes[i].antecedents[0] = syncNode;
1406 wndNodes[i].antType[0] = rf_control;
1407 }
1408
1409 /* connect the sync node to the Xor node */
1410 RF_ASSERT(xorNode->numAntecedents == 1);
1411 syncNode->succedents[nWndNodes] = xorNode;
1412 xorNode->antecedents[0] = syncNode;
1413 xorNode->antType[0] = rf_control;
1414
1415 /* connect the xor node to the write parity node */
1416 RF_ASSERT(xorNode->numSuccedents == nfaults);
1417 RF_ASSERT(wnpNode->numAntecedents == 1);
1418 xorNode->succedents[0] = wnpNode;
1419 wnpNode->antecedents[0] = xorNode;
1420 wnpNode->antType[0] = rf_trueData;
1421 if (nfaults == 2) {
1422 RF_ASSERT(wnqNode->numAntecedents == 1);
1423 xorNode->succedents[1] = wnqNode;
1424 wnqNode->antecedents[0] = xorNode;
1425 wnqNode->antType[0] = rf_trueData;
1426 }
1427 /* connect the write nodes to the term node */
1428 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1429 RF_ASSERT(termNode->numSuccedents == 0);
1430 for (i = 0; i < nWndNodes; i++) {
1431 RF_ASSERT(wndNodes->numSuccedents == 1);
1432 wndNodes[i].succedents[0] = termNode;
1433 termNode->antecedents[i] = &wndNodes[i];
1434 termNode->antType[i] = rf_control;
1435 }
1436 RF_ASSERT(wnpNode->numSuccedents == 1);
1437 wnpNode->succedents[0] = termNode;
1438 termNode->antecedents[nWndNodes] = wnpNode;
1439 termNode->antType[nWndNodes] = rf_control;
1440 if (nfaults == 2) {
1441 RF_ASSERT(wnqNode->numSuccedents == 1);
1442 wnqNode->succedents[0] = termNode;
1443 termNode->antecedents[nWndNodes + 1] = wnqNode;
1444 termNode->antType[nWndNodes + 1] = rf_control;
1445 }
1446 }
1447
1448
1449 /******************************************************************************
1450 *
1451 * creates a DAG to perform a small-write operation (either raid 5 or pq),
1452 * which is as follows:
1453 *
1454 * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
1455 * \- Rod X- Wnd [Und] -------/
1456 * [\- Rod X- Wnd [Und] ------/]
1457 * [\- Roq - Q --> Wnq [Unq]-/]
1458 *
1459 * Rop = read old parity
1460 * Rod = read old data
1461 * Roq = read old "q"
1462 * Cmt = commit node
1463 * Und = unlock data disk
1464 * Unp = unlock parity disk
1465 * Unq = unlock q disk
1466 * Wnp = write new parity
1467 * Wnd = write new data
1468 * Wnq = write new "q"
1469 * [ ] denotes optional segments in the graph
1470 *
1471 * Parameters: raidPtr - description of the physical array
1472 * asmap - logical & physical addresses for this access
1473 * bp - buffer ptr (holds write data)
1474 * flags - general flags (e.g. disk locking)
1475 * allocList - list of memory allocated in DAG creation
1476 * pfuncs - list of parity generating functions
1477 * qfuncs - list of q generating functions
1478 *
1479 * A null qfuncs indicates single fault tolerant
1480 *****************************************************************************/
1481
1482 void
1483 rf_CommonCreateSmallWriteDAGFwd(
1484 RF_Raid_t * raidPtr,
1485 RF_AccessStripeMap_t * asmap,
1486 RF_DagHeader_t * dag_h,
1487 void *bp,
1488 RF_RaidAccessFlags_t flags,
1489 RF_AllocListElem_t * allocList,
1490 RF_RedFuncs_t * pfuncs,
1491 RF_RedFuncs_t * qfuncs)
1492 {
1493 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1494 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1495 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1496 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1497 int i, j, nNodes, totalNumNodes, lu_flag;
1498 RF_ReconUnitNum_t which_ru;
1499 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
1500 int (*qfunc) (RF_DagNode_t *);
1501 int numDataNodes, numParityNodes;
1502 RF_StripeNum_t parityStripeID;
1503 RF_PhysDiskAddr_t *pda;
1504 char *name, *qname;
1505 long nfaults;
1506
1507 nfaults = qfuncs ? 2 : 1;
1508 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
1509
1510 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1511 pda = asmap->physInfo;
1512 numDataNodes = asmap->numStripeUnitsAccessed;
1513 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1514
1515 if (rf_dagDebug)
1516 printf("[Creating small-write DAG]\n");
1517 RF_ASSERT(numDataNodes > 0);
1518 dag_h->creator = "SmallWriteDAGFwd";
1519
1520 dag_h->numCommitNodes = 0;
1521 dag_h->numCommits = 0;
1522 dag_h->numSuccedents = 1;
1523
1524 qfunc = NULL;
1525 qname = NULL;
1526
1527 /* DAG creation occurs in four steps: 1. count the number of nodes in
1528 * the DAG 2. create the nodes 3. initialize the nodes 4. connect the
1529 * nodes */
1530
1531 /* Step 1. compute number of nodes in the graph */
1532
1533 /* number of nodes: a read and write for each data unit a redundancy
1534 * computation node for each parity node (nfaults * nparity) a read
1535 * and write for each parity unit a block node a terminate node if
1536 * atomic RMW an unlock node for each data unit, redundancy unit */
1537 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2;
1538 if (lu_flag)
1539 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1540
1541
1542 /* Step 2. create the nodes */
1543 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1544 i = 0;
1545 blockNode = &nodes[i];
1546 i += 1;
1547 readDataNodes = &nodes[i];
1548 i += numDataNodes;
1549 readParityNodes = &nodes[i];
1550 i += numParityNodes;
1551 writeDataNodes = &nodes[i];
1552 i += numDataNodes;
1553 writeParityNodes = &nodes[i];
1554 i += numParityNodes;
1555 xorNodes = &nodes[i];
1556 i += numParityNodes;
1557 termNode = &nodes[i];
1558 i += 1;
1559 if (lu_flag) {
1560 unlockDataNodes = &nodes[i];
1561 i += numDataNodes;
1562 unlockParityNodes = &nodes[i];
1563 i += numParityNodes;
1564 } else {
1565 unlockDataNodes = unlockParityNodes = NULL;
1566 }
1567 if (nfaults == 2) {
1568 readQNodes = &nodes[i];
1569 i += numParityNodes;
1570 writeQNodes = &nodes[i];
1571 i += numParityNodes;
1572 qNodes = &nodes[i];
1573 i += numParityNodes;
1574 if (lu_flag) {
1575 unlockQNodes = &nodes[i];
1576 i += numParityNodes;
1577 } else {
1578 unlockQNodes = NULL;
1579 }
1580 } else {
1581 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1582 }
1583 RF_ASSERT(i == totalNumNodes);
1584
1585 /* Step 3. initialize the nodes */
1586 /* initialize block node (Nil) */
1587 nNodes = numDataNodes + (nfaults * numParityNodes);
1588 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
1589
1590 /* initialize terminate node (Trm) */
1591 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
1592
1593 /* initialize nodes which read old data (Rod) */
1594 for (i = 0; i < numDataNodes; i++) {
1595 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList);
1596 RF_ASSERT(pda != NULL);
1597 readDataNodes[i].params[0].p = pda; /* physical disk addr
1598 * desc */
1599 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old
1600 * data */
1601 readDataNodes[i].params[2].v = parityStripeID;
1602 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1603 pda = pda->next;
1604 for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1605 readDataNodes[i].propList[j] = NULL;
1606 }
1607
1608 /* initialize nodes which read old parity (Rop) */
1609 pda = asmap->parityInfo;
1610 i = 0;
1611 for (i = 0; i < numParityNodes; i++) {
1612 RF_ASSERT(pda != NULL);
1613 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1614 readParityNodes[i].params[0].p = pda;
1615 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old
1616 * parity */
1617 readParityNodes[i].params[2].v = parityStripeID;
1618 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1619 for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1620 readParityNodes[i].propList[0] = NULL;
1621 pda = pda->next;
1622 }
1623
1624 /* initialize nodes which read old Q (Roq) */
1625 if (nfaults == 2) {
1626 pda = asmap->qInfo;
1627 for (i = 0; i < numParityNodes; i++) {
1628 RF_ASSERT(pda != NULL);
1629 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
1630 readQNodes[i].params[0].p = pda;
1631 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */
1632 readQNodes[i].params[2].v = parityStripeID;
1633 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1634 for (j = 0; j < readQNodes[i].numSuccedents; j++)
1635 readQNodes[i].propList[0] = NULL;
1636 pda = pda->next;
1637 }
1638 }
1639 /* initialize nodes which write new data (Wnd) */
1640 pda = asmap->physInfo;
1641 for (i = 0; i < numDataNodes; i++) {
1642 RF_ASSERT(pda != NULL);
1643 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1644 writeDataNodes[i].params[0].p = pda; /* physical disk addr
1645 * desc */
1646 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new
1647 * data to be written */
1648 writeDataNodes[i].params[2].v = parityStripeID;
1649 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1650
1651 if (lu_flag) {
1652 /* initialize node to unlock the disk queue */
1653 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
1654 unlockDataNodes[i].params[0].p = pda; /* physical disk addr
1655 * desc */
1656 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1657 }
1658 pda = pda->next;
1659 }
1660
1661
1662 /* initialize nodes which compute new parity and Q */
1663 /* we use the simple XOR func in the double-XOR case, and when we're
1664 * accessing only a portion of one stripe unit. the distinction
1665 * between the two is that the regular XOR func assumes that the
1666 * targbuf is a full SU in size, and examines the pda associated with
1667 * the buffer to decide where within the buffer to XOR the data,
1668 * whereas the simple XOR func just XORs the data into the start of
1669 * the buffer. */
1670 if ((numParityNodes == 2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
1671 func = pfuncs->simple;
1672 undoFunc = rf_NullNodeUndoFunc;
1673 name = pfuncs->SimpleName;
1674 if (qfuncs) {
1675 qfunc = qfuncs->simple;
1676 qname = qfuncs->SimpleName;
1677 }
1678 } else {
1679 func = pfuncs->regular;
1680 undoFunc = rf_NullNodeUndoFunc;
1681 name = pfuncs->RegularName;
1682 if (qfuncs) {
1683 qfunc = qfuncs->regular;
1684 qname = qfuncs->RegularName;
1685 }
1686 }
1687 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop}
1688 * nodes, and raidPtr */
1689 if (numParityNodes == 2) { /* double-xor case */
1690 for (i = 0; i < numParityNodes; i++) {
1691 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for
1692 * xor */
1693 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1694 xorNodes[i].params[0] = readDataNodes[i].params[0];
1695 xorNodes[i].params[1] = readDataNodes[i].params[1];
1696 xorNodes[i].params[2] = readParityNodes[i].params[0];
1697 xorNodes[i].params[3] = readParityNodes[i].params[1];
1698 xorNodes[i].params[4] = writeDataNodes[i].params[0];
1699 xorNodes[i].params[5] = writeDataNodes[i].params[1];
1700 xorNodes[i].params[6].p = raidPtr;
1701 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as
1702 * target buf */
1703 if (nfaults == 2) {
1704 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList); /* no wakeup func for
1705 * xor */
1706 qNodes[i].params[0] = readDataNodes[i].params[0];
1707 qNodes[i].params[1] = readDataNodes[i].params[1];
1708 qNodes[i].params[2] = readQNodes[i].params[0];
1709 qNodes[i].params[3] = readQNodes[i].params[1];
1710 qNodes[i].params[4] = writeDataNodes[i].params[0];
1711 qNodes[i].params[5] = writeDataNodes[i].params[1];
1712 qNodes[i].params[6].p = raidPtr;
1713 qNodes[i].results[0] = readQNodes[i].params[1].p; /* use old Q buf as
1714 * target buf */
1715 }
1716 }
1717 } else {
1718 /* there is only one xor node in this case */
1719 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
1720 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1721 for (i = 0; i < numDataNodes + 1; i++) {
1722 /* set up params related to Rod and Rop nodes */
1723 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
1724 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */
1725 }
1726 for (i = 0; i < numDataNodes; i++) {
1727 /* set up params related to Wnd and Wnp nodes */
1728 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */
1729 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */
1730 }
1731 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get
1732 * at RAID information */
1733 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1734 if (nfaults == 2) {
1735 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList);
1736 for (i = 0; i < numDataNodes; i++) {
1737 /* set up params related to Rod */
1738 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */
1739 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */
1740 }
1741 /* and read old q */
1742 qNodes[0].params[2 * numDataNodes + 0] = readQNodes[0].params[0]; /* pda */
1743 qNodes[0].params[2 * numDataNodes + 1] = readQNodes[0].params[1]; /* buffer pointer */
1744 for (i = 0; i < numDataNodes; i++) {
1745 /* set up params related to Wnd nodes */
1746 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */
1747 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */
1748 }
1749 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get
1750 * at RAID information */
1751 qNodes[0].results[0] = readQNodes[0].params[1].p;
1752 }
1753 }
1754
1755 /* initialize nodes which write new parity (Wnp) */
1756 pda = asmap->parityInfo;
1757 for (i = 0; i < numParityNodes; i++) {
1758 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList);
1759 RF_ASSERT(pda != NULL);
1760 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr)
1761 * filled in by xor node */
1762 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for
1763 * parity write
1764 * operation */
1765 writeParityNodes[i].params[2].v = parityStripeID;
1766 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1767
1768 if (lu_flag) {
1769 /* initialize node to unlock the disk queue */
1770 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList);
1771 unlockParityNodes[i].params[0].p = pda; /* physical disk addr
1772 * desc */
1773 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1774 }
1775 pda = pda->next;
1776 }
1777
1778 /* initialize nodes which write new Q (Wnq) */
1779 if (nfaults == 2) {
1780 pda = asmap->qInfo;
1781 for (i = 0; i < numParityNodes; i++) {
1782 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList);
1783 RF_ASSERT(pda != NULL);
1784 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr)
1785 * filled in by xor node */
1786 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for
1787 * parity write
1788 * operation */
1789 writeQNodes[i].params[2].v = parityStripeID;
1790 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1791
1792 if (lu_flag) {
1793 /* initialize node to unlock the disk queue */
1794 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList);
1795 unlockQNodes[i].params[0].p = pda; /* physical disk addr
1796 * desc */
1797 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1798 }
1799 pda = pda->next;
1800 }
1801 }
1802 /* Step 4. connect the nodes */
1803
1804 /* connect header to block node */
1805 dag_h->succedents[0] = blockNode;
1806
1807 /* connect block node to read old data nodes */
1808 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
1809 for (i = 0; i < numDataNodes; i++) {
1810 blockNode->succedents[i] = &readDataNodes[i];
1811 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
1812 readDataNodes[i].antecedents[0] = blockNode;
1813 readDataNodes[i].antType[0] = rf_control;
1814 }
1815
1816 /* connect block node to read old parity nodes */
1817 for (i = 0; i < numParityNodes; i++) {
1818 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
1819 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
1820 readParityNodes[i].antecedents[0] = blockNode;
1821 readParityNodes[i].antType[0] = rf_control;
1822 }
1823
1824 /* connect block node to read old Q nodes */
1825 if (nfaults == 2)
1826 for (i = 0; i < numParityNodes; i++) {
1827 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
1828 RF_ASSERT(readQNodes[i].numAntecedents == 1);
1829 readQNodes[i].antecedents[0] = blockNode;
1830 readQNodes[i].antType[0] = rf_control;
1831 }
1832
1833 /* connect read old data nodes to write new data nodes */
1834 for (i = 0; i < numDataNodes; i++) {
1835 RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1));
1836 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1837 readDataNodes[i].succedents[0] = &writeDataNodes[i];
1838 writeDataNodes[i].antecedents[0] = &readDataNodes[i];
1839 writeDataNodes[i].antType[0] = rf_antiData;
1840 }
1841
1842 /* connect read old data nodes to xor nodes */
1843 for (i = 0; i < numDataNodes; i++) {
1844 for (j = 0; j < numParityNodes; j++) {
1845 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
1846 readDataNodes[i].succedents[1 + j] = &xorNodes[j];
1847 xorNodes[j].antecedents[i] = &readDataNodes[i];
1848 xorNodes[j].antType[i] = rf_trueData;
1849 }
1850 }
1851
1852 /* connect read old data nodes to q nodes */
1853 if (nfaults == 2)
1854 for (i = 0; i < numDataNodes; i++)
1855 for (j = 0; j < numParityNodes; j++) {
1856 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
1857 readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j];
1858 qNodes[j].antecedents[i] = &readDataNodes[i];
1859 qNodes[j].antType[i] = rf_trueData;
1860 }
1861
1862 /* connect read old parity nodes to xor nodes */
1863 for (i = 0; i < numParityNodes; i++) {
1864 for (j = 0; j < numParityNodes; j++) {
1865 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1866 readParityNodes[i].succedents[j] = &xorNodes[j];
1867 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
1868 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
1869 }
1870 }
1871
1872 /* connect read old q nodes to q nodes */
1873 if (nfaults == 2)
1874 for (i = 0; i < numParityNodes; i++) {
1875 for (j = 0; j < numParityNodes; j++) {
1876 RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes);
1877 readQNodes[i].succedents[j] = &qNodes[j];
1878 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
1879 qNodes[j].antType[numDataNodes + i] = rf_trueData;
1880 }
1881 }
1882
1883 /* connect xor nodes to the write new parity nodes */
1884 for (i = 0; i < numParityNodes; i++) {
1885 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
1886 for (j = 0; j < numParityNodes; j++) {
1887 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
1888 xorNodes[i].succedents[j] = &writeParityNodes[j];
1889 writeParityNodes[j].antecedents[i] = &xorNodes[i];
1890 writeParityNodes[j].antType[i] = rf_trueData;
1891 }
1892 }
1893
1894 /* connect q nodes to the write new q nodes */
1895 if (nfaults == 2)
1896 for (i = 0; i < numParityNodes; i++) {
1897 RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes);
1898 for (j = 0; j < numParityNodes; j++) {
1899 RF_ASSERT(qNodes[j].numSuccedents == 1);
1900 qNodes[i].succedents[j] = &writeQNodes[j];
1901 writeQNodes[j].antecedents[i] = &qNodes[i];
1902 writeQNodes[j].antType[i] = rf_trueData;
1903 }
1904 }
1905
1906 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1907 RF_ASSERT(termNode->numSuccedents == 0);
1908 for (i = 0; i < numDataNodes; i++) {
1909 if (lu_flag) {
1910 /* connect write new data nodes to unlock nodes */
1911 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1912 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1913 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1914 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1915 unlockDataNodes[i].antType[0] = rf_control;
1916
1917 /* connect unlock nodes to term node */
1918 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1919 unlockDataNodes[i].succedents[0] = termNode;
1920 termNode->antecedents[i] = &unlockDataNodes[i];
1921 termNode->antType[i] = rf_control;
1922 } else {
1923 /* connect write new data nodes to term node */
1924 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1925 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1926 writeDataNodes[i].succedents[0] = termNode;
1927 termNode->antecedents[i] = &writeDataNodes[i];
1928 termNode->antType[i] = rf_control;
1929 }
1930 }
1931
1932 for (i = 0; i < numParityNodes; i++) {
1933 if (lu_flag) {
1934 /* connect write new parity nodes to unlock nodes */
1935 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1936 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1937 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1938 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1939 unlockParityNodes[i].antType[0] = rf_control;
1940
1941 /* connect unlock nodes to term node */
1942 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1943 unlockParityNodes[i].succedents[0] = termNode;
1944 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1945 termNode->antType[numDataNodes + i] = rf_control;
1946 } else {
1947 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1948 writeParityNodes[i].succedents[0] = termNode;
1949 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1950 termNode->antType[numDataNodes + i] = rf_control;
1951 }
1952 }
1953
1954 if (nfaults == 2)
1955 for (i = 0; i < numParityNodes; i++) {
1956 if (lu_flag) {
1957 /* connect write new Q nodes to unlock nodes */
1958 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1959 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1960 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1961 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1962 unlockQNodes[i].antType[0] = rf_control;
1963
1964 /* connect unlock nodes to unblock node */
1965 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1966 unlockQNodes[i].succedents[0] = termNode;
1967 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1968 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1969 } else {
1970 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1971 writeQNodes[i].succedents[0] = termNode;
1972 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1973 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1974 }
1975 }
1976 }
1977
1978
1979
1980 /******************************************************************************
1981 * create a write graph (fault-free or degraded) for RAID level 1
1982 *
1983 * Hdr Nil -> Wpd -> Nil -> Trm
1984 * Nil -> Wsd ->
1985 *
1986 * The "Wpd" node writes data to the primary copy in the mirror pair
1987 * The "Wsd" node writes data to the secondary copy in the mirror pair
1988 *
1989 * Parameters: raidPtr - description of the physical array
1990 * asmap - logical & physical addresses for this access
1991 * bp - buffer ptr (holds write data)
1992 * flags - general flags (e.g. disk locking)
1993 * allocList - list of memory allocated in DAG creation
1994 *****************************************************************************/
1995
1996 void
1997 rf_CreateRaidOneWriteDAGFwd(
1998 RF_Raid_t * raidPtr,
1999 RF_AccessStripeMap_t * asmap,
2000 RF_DagHeader_t * dag_h,
2001 void *bp,
2002 RF_RaidAccessFlags_t flags,
2003 RF_AllocListElem_t * allocList)
2004 {
2005 RF_DagNode_t *blockNode, *unblockNode, *termNode;
2006 RF_DagNode_t *nodes, *wndNode, *wmirNode;
2007 int nWndNodes, nWmirNodes, i;
2008 RF_ReconUnitNum_t which_ru;
2009 RF_PhysDiskAddr_t *pda, *pdaP;
2010 RF_StripeNum_t parityStripeID;
2011
2012 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2013 asmap->raidAddress, &which_ru);
2014 if (rf_dagDebug) {
2015 printf("[Creating RAID level 1 write DAG]\n");
2016 }
2017 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; /* 2 implies access not
2018 * SU aligned */
2019 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2020
2021 /* alloc the Wnd nodes and the Wmir node */
2022 if (asmap->numDataFailed == 1)
2023 nWndNodes--;
2024 if (asmap->numParityFailed == 1)
2025 nWmirNodes--;
2026
2027 /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock +
2028 * terminator) */
2029 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2030 i = 0;
2031 wndNode = &nodes[i];
2032 i += nWndNodes;
2033 wmirNode = &nodes[i];
2034 i += nWmirNodes;
2035 blockNode = &nodes[i];
2036 i += 1;
2037 unblockNode = &nodes[i];
2038 i += 1;
2039 termNode = &nodes[i];
2040 i += 1;
2041 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2042
2043 /* this dag can commit immediately */
2044 dag_h->numCommitNodes = 0;
2045 dag_h->numCommits = 0;
2046 dag_h->numSuccedents = 1;
2047
2048 /* initialize the unblock and term nodes */
2049 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList);
2050 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
2051 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2052
2053 /* initialize the wnd nodes */
2054 if (nWndNodes > 0) {
2055 pda = asmap->physInfo;
2056 for (i = 0; i < nWndNodes; i++) {
2057 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
2058 RF_ASSERT(pda != NULL);
2059 wndNode[i].params[0].p = pda;
2060 wndNode[i].params[1].p = pda->bufPtr;
2061 wndNode[i].params[2].v = parityStripeID;
2062 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2063 pda = pda->next;
2064 }
2065 RF_ASSERT(pda == NULL);
2066 }
2067 /* initialize the mirror nodes */
2068 if (nWmirNodes > 0) {
2069 pda = asmap->physInfo;
2070 pdaP = asmap->parityInfo;
2071 for (i = 0; i < nWmirNodes; i++) {
2072 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
2073 RF_ASSERT(pda != NULL);
2074 wmirNode[i].params[0].p = pdaP;
2075 wmirNode[i].params[1].p = pda->bufPtr;
2076 wmirNode[i].params[2].v = parityStripeID;
2077 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2078 pda = pda->next;
2079 pdaP = pdaP->next;
2080 }
2081 RF_ASSERT(pda == NULL);
2082 RF_ASSERT(pdaP == NULL);
2083 }
2084 /* link the header node to the block node */
2085 RF_ASSERT(dag_h->numSuccedents == 1);
2086 RF_ASSERT(blockNode->numAntecedents == 0);
2087 dag_h->succedents[0] = blockNode;
2088
2089 /* link the block node to the write nodes */
2090 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2091 for (i = 0; i < nWndNodes; i++) {
2092 RF_ASSERT(wndNode[i].numAntecedents == 1);
2093 blockNode->succedents[i] = &wndNode[i];
2094 wndNode[i].antecedents[0] = blockNode;
2095 wndNode[i].antType[0] = rf_control;
2096 }
2097 for (i = 0; i < nWmirNodes; i++) {
2098 RF_ASSERT(wmirNode[i].numAntecedents == 1);
2099 blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2100 wmirNode[i].antecedents[0] = blockNode;
2101 wmirNode[i].antType[0] = rf_control;
2102 }
2103
2104 /* link the write nodes to the unblock node */
2105 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2106 for (i = 0; i < nWndNodes; i++) {
2107 RF_ASSERT(wndNode[i].numSuccedents == 1);
2108 wndNode[i].succedents[0] = unblockNode;
2109 unblockNode->antecedents[i] = &wndNode[i];
2110 unblockNode->antType[i] = rf_control;
2111 }
2112 for (i = 0; i < nWmirNodes; i++) {
2113 RF_ASSERT(wmirNode[i].numSuccedents == 1);
2114 wmirNode[i].succedents[0] = unblockNode;
2115 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2116 unblockNode->antType[i + nWndNodes] = rf_control;
2117 }
2118
2119 /* link the unblock node to the term node */
2120 RF_ASSERT(unblockNode->numSuccedents == 1);
2121 RF_ASSERT(termNode->numAntecedents == 1);
2122 RF_ASSERT(termNode->numSuccedents == 0);
2123 unblockNode->succedents[0] = termNode;
2124 termNode->antecedents[0] = unblockNode;
2125 termNode->antType[0] = rf_control;
2126
2127 return;
2128 }
2129